Github user danny0405 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2704#discussion_r193436151
--- Diff:
storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
---
@@ -12,28 +12,40 @@
package org.apache.storm.messaging.netty;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import java.util.Map;
import org.apache.storm.Config;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-
-class StormServerPipelineFactory implements ChannelPipelineFactory {
- private Server server;
-
- StormServerPipelineFactory(Server server) {
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+
+class StormServerPipelineFactory extends ChannelInitializer<Channel> {
+ private final KryoValuesSerializer ser;
+ private final KryoValuesDeserializer deser;
+ private final Map<String, Object> topoConf;
+ private final Server server;
+
+ StormServerPipelineFactory(KryoValuesSerializer ser,
KryoValuesDeserializer deser,
+ Map<String, Object> topoConf, Server server) {
+ this.ser = ser;
+ this.deser = deser;
+ this.topoConf = topoConf;
this.server = server;
}
- public ChannelPipeline getPipeline() throws Exception {
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
// Create a default pipeline implementation.
- ChannelPipeline pipeline = Channels.pipeline();
-
+ ChannelPipeline pipeline = ch.pipeline();
+
// Decoder
--- End diff --
nit: whitespace
---