This is an automated email from the ASF dual-hosted git repository. upthewaterspout pushed a commit to branch feature/redis-performance-testing in repository https://gitbox.apache.org/repos/asf/geode.git
commit 420b28ce48cc8184703a11da86a444a7819fc10f Author: Jens Deppe <[email protected]> AuthorDate: Mon Mar 15 15:03:52 2021 -0700 Add RedisDecoder to the netty pipeline --- .../internal/netty/MessageToCommandDecoder.java | 61 ++++++++++++++++++++++ .../redis/internal/netty/NettyRedisServer.java | 9 +++- 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/MessageToCommandDecoder.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/MessageToCommandDecoder.java new file mode 100644 index 0000000..3f0685c --- /dev/null +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/MessageToCommandDecoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.redis.internal.netty; + +import java.util.ArrayList; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.redis.ArrayHeaderRedisMessage; +import io.netty.handler.codec.redis.FullBulkStringRedisMessage; +import io.netty.handler.codec.redis.RedisMessage; + +public class MessageToCommandDecoder extends MessageToMessageDecoder<RedisMessage> { + + private static final ThreadLocal<LocalRedisArray> currentCommand = + ThreadLocal.withInitial(LocalRedisArray::new); + + @Override + protected void decode(ChannelHandlerContext ctx, RedisMessage msg, List<Object> out) + throws Exception { + LocalRedisArray array = currentCommand.get(); + + if (msg instanceof ArrayHeaderRedisMessage) { + array.count = ((ArrayHeaderRedisMessage) msg).length(); + return; + } + + if (msg instanceof FullBulkStringRedisMessage) { + ByteBuf buffer = ((FullBulkStringRedisMessage) msg).content(); + byte[] data = new byte[buffer.readableBytes()]; + buffer.readBytes(data); + array.list.add(data); + array.count--; + + if (array.count == 0) { + out.add(new Command(array.list)); + currentCommand.remove(); + } + } + } + + private static class LocalRedisArray { + List<byte[]> list = new ArrayList<>(); + long count = 0; + } +} diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java index 3a1d903..5110696 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java @@ -43,6 +43,8 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.redis.RedisBulkStringAggregator; +import io.netty.handler.codec.redis.RedisDecoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.timeout.WriteTimeoutHandler; @@ -169,8 +171,11 @@ public class NettyRedisServer { } ChannelPipeline pipeline = socketChannel.pipeline(); addSSLIfEnabled(socketChannel, pipeline); - pipeline.addLast(ByteToCommandDecoder.class.getSimpleName(), - new ByteToCommandDecoder(redisStats)); + // pipeline.addLast(ByteToCommandDecoder.class.getSimpleName(), + // new ByteToCommandDecoder(redisStats)); + pipeline.addLast(new RedisDecoder()); + pipeline.addLast(new RedisBulkStringAggregator()); + pipeline.addLast(new MessageToCommandDecoder()); pipeline.addLast(new WriteTimeoutHandler(10)); pipeline.addLast(ExecutionHandlerContext.class.getSimpleName(), new ExecutionHandlerContext(socketChannel, regionProvider, pubsub,
