http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java new file mode 100644 index 0000000..bbd33ea --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.channel; + +import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper; + +public class ChannelHandlerContextWrapperImpl<ChannelHandlerContext> implements ChannelHandlerContextWrapper { + + private io.netty.channel.ChannelHandlerContext context; + + public ChannelHandlerContextWrapperImpl(io.netty.channel.ChannelHandlerContext context) { + this.context = context; + } + + public io.netty.channel.ChannelHandlerContext getContext() { + return context; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java new file mode 100644 index 0000000..b90afc1 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.channel; + +import io.netty.channel.FileRegion; +import io.netty.util.AbstractReferenceCounted; +import java.io.IOException; +import java.nio.channels.WritableByteChannel; +import org.apache.rocketmq.remoting.api.channel.ChunkRegion; + +public class FileRegionImpl extends AbstractReferenceCounted implements FileRegion { + private final ChunkRegion chunkRegion; + + public FileRegionImpl(ChunkRegion chunkRegion) { + this.chunkRegion = chunkRegion; + } + + @Override + public long position() { + return chunkRegion.position(); + } + + @Override + public long transfered() { + return chunkRegion.transferred(); + } + + @Override + public long transferred() { + return chunkRegion.transferred(); + } + + @Override + public long count() { + return chunkRegion.count(); + } + + @Override + public long transferTo(WritableByteChannel target, long position) throws IOException { + return chunkRegion.transferTo(target, position); + } + + @Override + protected void deallocate() { + chunkRegion.release(); + } + + @Override + public FileRegion retain() { + super.retain(); + return this; + } + + @Override + public FileRegion retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public FileRegion touch() { + return this; + } + + @Override + public FileRegion touch(Object hint) { + return this; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java new file mode 100644 index 0000000..ba4a969 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.channel; + +import io.netty.channel.Channel; +import java.net.SocketAddress; +import org.apache.rocketmq.remoting.api.channel.ChunkRegion; +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; + +public class NettyChannelImpl implements RemotingChannel { + private final io.netty.channel.Channel channel; + + public NettyChannelImpl(Channel channel) { + this.channel = channel; + } + + @Override + public SocketAddress localAddress() { + return channel.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return channel.remoteAddress(); + } + + @Override + public boolean isWritable() { + return channel.isWritable(); + } + + @Override + public boolean isActive() { + return channel.isActive(); + } + + @Override + public void close() { + channel.close(); + } + + @Override + public void reply(final RemotingCommand command) { + channel.writeAndFlush(command); + } + + @Override + public void reply(final ChunkRegion fileRegion) { + channel.writeAndFlush(fileRegion); + } + + public io.netty.channel.Channel getChannel() { + return channel; + } + + @Override + public boolean equals(final Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + final NettyChannelImpl that = (NettyChannelImpl) o; + + return channel != null ? channel.equals(that.channel) : that.channel == null; + + } + + @Override + public int hashCode() { + return channel != null ? channel.hashCode() : 0; + } + + @Override + public String toString() { + return "NettyChannelImpl [channel=" + channel + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java new file mode 100644 index 0000000..44d4fd9 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.command; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Map.Entry; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.command.TrafficType; +import org.apache.rocketmq.remoting.api.exception.RemoteCodecException; + +public class CodecHelper { + //ProtocolType + TotalLength + RequestId + SerializeType + TrafficType + CodeLength + RemarkLength + PropertiesSize + ParameterLength + public final static int MIN_PROTOCOL_LEN = 1 + 4 + 4 + 1 + 1 + 2 + 2 + 2 + 4; + public final static char PROPERTY_SEPARATOR = '\n'; + public final static Charset REMOTING_CHARSET = Charset.forName("UTF-8"); + + public final static int CODE_MAX_LEN = 512; + public final static int PARAMETER_MAX_LEN = 33554432; + public final static int BODY_MAX_LEN = 33554432; + public final static int PACKET_MAX_LEN = 33554432; + + public static ByteBuffer encodeHeader(final RemotingCommand command, final int parameterLength, + final int extraPayload) { + byte[] code = command.opCode().getBytes(REMOTING_CHARSET); + int codeLength = code.length; + + byte[] remark = command.remark().getBytes(REMOTING_CHARSET); + int remarkLength = remark.length; + + byte[][] props = null; + int propsLength = 0; + StringBuilder sb = new StringBuilder(); + if (!command.properties().isEmpty()) { + props = new byte[command.properties().size()][]; + int i = 0; + for (Entry<String, String> next : command.properties().entrySet()) { + sb.setLength(0); + sb.append(next.getKey()); + sb.append(PROPERTY_SEPARATOR); + sb.append(next.getValue()); + + props[i] = sb.toString().getBytes(REMOTING_CHARSET); + + propsLength += 2; + propsLength += props[i].length; + i++; + } + } + + int totalLength = MIN_PROTOCOL_LEN - 1 - 4 + + codeLength + + remarkLength + + propsLength + + parameterLength + + extraPayload; + + int headerLength = 1 + 4 + totalLength - parameterLength - extraPayload; + + ByteBuffer buf = ByteBuffer.allocate(headerLength); + buf.put(command.protocolType()); + buf.putInt(totalLength); + buf.putInt(command.requestID()); + buf.put(command.serializerType()); + buf.put((byte) command.trafficType().ordinal()); + + buf.putShort((short) codeLength); + if (codeLength > 0) { + buf.put(code); + } + buf.putShort((short) remarkLength); + if (remarkLength > 0) { + buf.put(remark); + } + if (props != null) { + buf.putShort((short) props.length); + for (byte[] prop : props) { + buf.putShort((short) prop.length); + buf.put(prop); + } + } else { + buf.putShort((short) 0); + } + + buf.putInt(parameterLength); + + buf.flip(); + + return buf; + } + + public static RemotingCommand decode(final ByteBuffer byteBuffer) { + RemotingCommandImpl cmd = new RemotingCommandImpl(); + int totalLength = byteBuffer.limit(); + cmd.requestID(byteBuffer.getInt()); + cmd.serializerType(byteBuffer.get()); + cmd.trafficType(TrafficType.parse(byteBuffer.get())); + + { + short size = byteBuffer.getShort(); + if (size > 0 && size <= CODE_MAX_LEN) { + byte[] bytes = new byte[size]; + byteBuffer.get(bytes); + String str = new String(bytes, REMOTING_CHARSET); + cmd.opCode(str); + } else { + throw new RemoteCodecException(String.format("Code length: %d over max limit: %d", size, CODE_MAX_LEN)); + } + } + + { + short size = byteBuffer.getShort(); + if (size > 0) { + byte[] bytes = new byte[size]; + byteBuffer.get(bytes); + String str = new String(bytes, REMOTING_CHARSET); + cmd.remark(str); + } + } + + { + short size = byteBuffer.getShort(); + if (size > 0) { + for (int i = 0; i < size; i++) { + short length = byteBuffer.getShort(); + if (length > 0) { + byte[] bytes = new byte[length]; + byteBuffer.get(bytes); + String str = new String(bytes, REMOTING_CHARSET); + int index = str.indexOf(PROPERTY_SEPARATOR); + if (index > 0) { + String key = str.substring(0, index); + String value = str.substring(index + 1); + cmd.property(key, value); + } + } + } + } + } + + { + int size = byteBuffer.getInt(); + if (size > 0 && size <= PARAMETER_MAX_LEN) { + byte[] bytes = new byte[size]; + byteBuffer.get(bytes); + cmd.parameterBytes(bytes); + } else if (size != 0) { + throw new RemoteCodecException(String.format("Parameter size: %d over max limit: %d", size, PARAMETER_MAX_LEN)); + } + } + + { + int size = totalLength - byteBuffer.position(); + if (size > 0 && size <= BODY_MAX_LEN) { + byte[] bytes = new byte[size]; + byteBuffer.get(bytes); + cmd.extraPayload(bytes); + } else if (size != 0) { + throw new RemoteCodecException(String.format("Body size: %d over max limit: %d", size, BODY_MAX_LEN)); + } + } + + return cmd; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java new file mode 100644 index 0000000..f5d2126 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.command; + +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory; +import org.apache.rocketmq.remoting.api.command.TrafficType; +import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta; + +public class RemotingCommandFactoryImpl implements RemotingCommandFactory { + private RemotingCommandFactoryMeta remotingCommandFactoryMeta; + + public RemotingCommandFactoryImpl() { + this(new RemotingCommandFactoryMeta()); + } + + public RemotingCommandFactoryImpl(final RemotingCommandFactoryMeta remotingCommandFactoryMeta) { + this.remotingCommandFactoryMeta = remotingCommandFactoryMeta; + } + + @Override + public RemotingCommand createRequest() { + RemotingCommand request = new RemotingCommandImpl(); + request.protocolType(this.remotingCommandFactoryMeta.getProtocolType()); + request.serializerType(this.remotingCommandFactoryMeta.getSerializeType()); + return request; + } + + @Override + public RemotingCommand createResponse(final RemotingCommand command) { + RemotingCommand response = new RemotingCommandImpl(); + response.requestID(command.requestID()); + response.protocolType(command.protocolType()); + response.serializerType(command.serializerType()); + response.trafficType(TrafficType.RESPONSE); + return response; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java new file mode 100644 index 0000000..bcf2338 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.command; + +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.command.TrafficType; +import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; +import org.apache.rocketmq.remoting.common.TypePresentation; + +public class RemotingCommandImpl implements RemotingCommand { + public final static RequestIdGenerator REQUEST_ID_GENERATOR = RequestIdGenerator.inst; + + private byte protocolType; + private byte serializeType; + + private volatile int requestId = REQUEST_ID_GENERATOR.incrementAndGet(); + private TrafficType trafficType = TrafficType.REQUEST_SYNC; + private String code = CommandFlag.SUCCESS.flag(); + private String remark = ""; + private Map<String, String> properties = new HashMap<String, String>(); + private Object parameter; + private byte[] extraPayload; + + private byte[] parameterByte; + + protected RemotingCommandImpl() { + } + + @Override + public byte protocolType() { + return this.protocolType; + } + + @Override + public void protocolType(byte value) { + this.protocolType = value; + } + + @Override + public int requestID() { + return requestId; + } + + @Override + public void requestID(int value) { + this.requestId = value; + } + + @Override + public byte serializerType() { + return this.serializeType; + } + + @Override + public void serializerType(byte value) { + this.serializeType = value; + } + + @Override + public TrafficType trafficType() { + return this.trafficType; + } + + @Override + public void trafficType(TrafficType value) { + this.trafficType = value; + } + + @Override + public String opCode() { + return this.code; + } + + @Override + public void opCode(String value) { + this.code = value; + } + + @Override + public String remark() { + return this.remark; + } + + @Override + public void remark(String value) { + this.remark = value; + } + + @Override + public Map<String, String> properties() { + return this.properties; + } + + @Override + public void properties(Map<String, String> value) { + this.properties = value; + } + + @Override + public String property(String key) { + return this.properties.get(key); + } + + @Override + public void property(String key, String value) { + this.properties.put(key, value); + } + + @Override + public Object parameter() { + return this.parameter; + } + + @Override + public void parameter(Object value) { + this.parameter = value; + } + + @Override + public byte[] parameterBytes() { + return this.getParameterByte(); + } + + public byte[] getParameterByte() { + return parameterByte; + } + + public void setParameterByte(byte[] parameterByte) { + this.parameterByte = parameterByte; + } + + @Override + public void parameterBytes(byte[] value) { + this.setParameterByte(value); + } + + @Override + public byte[] extraPayload() { + return this.extraPayload; + } + + @Override + public void extraPayload(byte[] value) { + this.extraPayload = value; + } + + @Override + public <T> T parameter(SerializerFactory serializerFactory, Class<T> c) { + if (this.parameter() != null) + return (T) this.parameter(); + final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), c); + this.parameter(decode); + return decode; + } + + @Override + public <T> T parameter(SerializerFactory serializerFactory, TypePresentation<T> typePresentation) { + if (this.parameter() != null) + return (T) this.parameter(); + final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), typePresentation); + this.parameter(decode); + return decode; + } + + @Override + public <T> T parameter(SerializerFactory serializerFactory, Type type) { + if (this.parameter() != null) + return (T) this.parameter(); + final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), type); + this.parameter(decode); + return decode; + } + + @Override + public int hashCode() { + return HashCodeBuilder.reflectionHashCode(this); + } + + @Override + public boolean equals(Object o) { + return EqualsBuilder.reflectionEquals(this, o); + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java new file mode 100644 index 0000000..9b85c95 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.command; + +import java.util.concurrent.atomic.AtomicInteger; + +public class RequestIdGenerator { + public static RequestIdGenerator inst = new RequestIdGenerator(); + + private AtomicInteger generator = new AtomicInteger(0); + + private RequestIdGenerator() { + + } + + public int incrementAndGet() { + return generator.incrementAndGet(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java new file mode 100644 index 0000000..ec9cece --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty; + +import io.netty.channel.Channel; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +public class NettyChannelEvent { + private final Channel channel; + private final NettyChannelEventType type; + private final Throwable cause; + + public NettyChannelEvent(NettyChannelEventType type, Channel channel) { + this(type, channel, null); + } + + public NettyChannelEvent(NettyChannelEventType type, Channel channel, Throwable cause) { + this.type = type; + this.channel = channel; + this.cause = cause; + } + + public NettyChannelEventType getType() { + return type; + } + + public Channel getChannel() { + return channel; + } + + public Throwable getCause() { + return cause; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java new file mode 100644 index 0000000..1bf2277 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty; + +public enum NettyChannelEventType { + ACTIVE, + INACTIVE, + IDLE, + EXCEPTION +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java new file mode 100644 index 0000000..1af62cb --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -0,0 +1,641 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.remoting.api.AsyncHandler; +import org.apache.rocketmq.remoting.api.RemotingEndPoint; +import org.apache.rocketmq.remoting.api.RemotingService; +import org.apache.rocketmq.remoting.api.RequestProcessor; +import org.apache.rocketmq.remoting.api.channel.ChannelEventListener; +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory; +import org.apache.rocketmq.remoting.api.command.TrafficType; +import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; +import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext; +import org.apache.rocketmq.remoting.api.interceptor.Interceptor; +import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup; +import org.apache.rocketmq.remoting.api.interceptor.RequestContext; +import org.apache.rocketmq.remoting.api.interceptor.ResponseContext; +import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory; +import org.apache.rocketmq.remoting.api.serializable.Serializer; +import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; +import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup; +import org.apache.rocketmq.remoting.common.Pair; +import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta; +import org.apache.rocketmq.remoting.common.ResponseResult; +import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; +import org.apache.rocketmq.remoting.config.RemotingConfig; +import org.apache.rocketmq.remoting.external.ThreadUtils; +import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl; +import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl; +import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl; +import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl; +import org.apache.rocketmq.remoting.internal.UIDGenerator; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class NettyRemotingAbstract implements RemotingService { + protected static final Logger LOG = LoggerFactory.getLogger(NettyRemotingAbstract.class); + protected final ProtocolFactory protocolFactory = new ProtocolFactoryImpl(); + protected final SerializerFactory serializerFactory = new SerializerFactoryImpl(); + protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor"); + private final Semaphore semaphoreOneway; + private final Semaphore semaphoreAsync; + private final Map<Integer, ResponseResult> ackTables = new ConcurrentHashMap<Integer, ResponseResult>(256); + private final Map<String, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<String, Pair<RequestProcessor, ExecutorService>>(); + private final AtomicLong count = new AtomicLong(0); + private final RemotingCommandFactory remotingCommandFactory; + private final String remotingInstanceId = UIDGenerator.instance().createUID(); + + private final ExecutorService publicExecutor; + protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true); + private InterceptorGroup interceptorGroup = new InterceptorGroup(); + private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup(); + + NettyRemotingAbstract(RemotingConfig clientConfig) { + this(clientConfig, new RemotingCommandFactoryMeta()); + } + + NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) { + this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true); + this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true); + this.publicExecutor = ThreadUtils.newThreadPoolExecutor(clientConfig.getClientAsyncCallbackExecutorThreads(), clientConfig.getClientAsyncCallbackExecutorThreads(), 60, + TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10000), "PublicExecutor", true); + this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); + } + + public SerializerFactory getSerializerFactory() { + return serializerFactory; + } + + protected void putNettyEvent(final NettyChannelEvent event) { + this.channelEventExecutor.putNettyEvent(event); + } + + protected void startUpHouseKeepingService() { + this.houseKeepingService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + scanResponseTable(); + } + }, 3000, 1000, TimeUnit.MICROSECONDS); + } + + @Override + public void start() { + if (this.channelEventListenerGroup.size() > 0) { + this.channelEventExecutor.start(); + } + } + + @Override + public void stop() { + ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS); + ThreadUtils.shutdownGracefully(channelEventExecutor); + } + + protected void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand command) throws Exception { + if (command != null) { + switch (command.trafficType()) { + case REQUEST_ONEWAY: + case REQUEST_ASYNC: + case REQUEST_SYNC: + processRequestCommand(ctx, command); + break; + case RESPONSE: + processResponseCommand(ctx, command); + break; + default: + LOG.warn("Not supported The traffic type {} !", command.trafficType()); + break; + } + } + } + + public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { + Pair<RequestProcessor, ExecutorService> processorExecutorPair = this.processorTables.get(cmd.opCode()); + + RemotingChannel channel = new NettyChannelImpl(ctx.channel()); + + Runnable run = buildProcessorTask(ctx, cmd, processorExecutorPair, channel); + + try { + processorExecutorPair.getRight().submit(run); + } catch (RejectedExecutionException e) { + if ((System.currentTimeMillis() % 10000) == 0) { + LOG.warn(String.format("Request %s from %s Rejected by server executor %s !", cmd, + extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString())); + } + + if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { + interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, + extractRemoteAddress(ctx.channel()), cmd, e, "FLOW_CONTROL")); + + RemotingCommand response = remotingCommandFactory.createResponse(cmd); + response.opCode(RemotingCommand.CommandFlag.ERROR.flag()); + response.remark("SYSTEM_BUSY"); + writeAndFlush(ctx.channel(), response); + } + } + } + + @NotNull + private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd, + final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) { + return new Runnable() { + @Override + public void run() { + try { + interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE, + extractRemoteAddress(ctx.channel()), cmd)); + + RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd); + + interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE, + extractRemoteAddress(ctx.channel()), cmd, response)); + + handleResponse(response, cmd, ctx); + } catch (Throwable e) { + LOG.error(String.format("Process request %s error !", cmd.toString()), e); + + handleException(e, cmd, ctx); + } + } + }; + } + + private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) { + if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { + //FiXME Exception interceptor can not throw exception + interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, "")); + RemotingCommand response = remotingCommandFactory.createResponse(cmd); + response.opCode(RemotingCommand.CommandFlag.ERROR.flag()); + response.remark(serializeException(cmd.serializerType(), e)); + response.property("Exception", e.getClass().getName()); + ctx.writeAndFlush(response); + } + } + + private String serializeException(byte serializeType, Throwable exception) { + final Serializer serialization = getSerializerFactory().get(serializeType); + return serialization.encode(exception).toString(); + } + + private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) { + if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { + if (response != null) { + try { + writeAndFlush(ctx.channel(), response); + } catch (Throwable e) { + LOG.error(String.format("Process request %s success, but transfer response %s failed !", + cmd.toString(), response.toString()), e); + } + } + } + + } + + private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { + final ResponseResult responseResult = ackTables.get(cmd.requestID()); + if (responseResult != null) { + responseResult.setResponseCommand(cmd); + responseResult.release(); + + long time = System.currentTimeMillis(); + ackTables.remove(cmd.requestID()); + if (count.incrementAndGet() % 5000 == 0) + LOG.warn("REQUEST ID:{}, cost time:{}, ackTables.size:{}", cmd.requestID(), time - responseResult.getBeginTimestamp(), + ackTables.size()); + if (responseResult.getAsyncHandler() != null) { + boolean sameThread = false; + ExecutorService executor = this.getCallbackExecutor(); + if (executor != null) { + try { + executor.submit(new Runnable() { + @Override + public void run() { + try { + responseResult.executeCallbackArrived(responseResult.getResponseCommand()); + } catch (Throwable e) { + LOG.warn("Execute callback error !", e); + } + } + }); + } catch (RejectedExecutionException e) { + sameThread = true; + LOG.warn("Execute submit error !", e); + } + } else { + sameThread = true; + } + + if (sameThread) { + try { + responseResult.executeCallbackArrived(responseResult.getResponseCommand()); + } catch (Throwable e) { + LOG.warn("Execute callback in response thread error !", e); + } + } + } else { + responseResult.putResponse(cmd); + } + } else { + LOG.warn("request {} from {} has not matched response !", cmd, extractRemoteAddress(ctx.channel())); + } + } + + private void writeAndFlush(final Channel channel, final Object msg, final ChannelFutureListener listener) { + channel.writeAndFlush(msg).addListener(listener); + } + + private void writeAndFlush(final Channel channel, final Object msg) { + channel.writeAndFlush(msg); + } + + public ExecutorService getCallbackExecutor() { + return this.publicExecutor; + } + + void scanResponseTable() { + /* + Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Integer, ResponseResult> next = iterator.next(); + ResponseResult result = next.getValue(); + + if ((result.getBeginTimestamp() + result.getTimeoutMillis()) <= System.currentTimeMillis()) { + iterator.remove(); + try { + long timeoutMillis = result.getTimeoutMillis(); + long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp(); + result.onTimeout(timeoutMillis, costTimeMillis); + LOG.error("scan response table command {} failed", result.getRequestId()); + } catch (Throwable e) { + LOG.warn("Error occurred when execute timeout callback !", e); + } finally { + result.release(); + LOG.warn("Removed timeout request {} ", result); + } + } + } + */ + } + + public RemotingCommand invokeWithInterceptor(final Channel channel, final RemotingCommand request, + long timeoutMillis) { + request.trafficType(TrafficType.REQUEST_SYNC); + + final String remoteAddr = extractRemoteAddress(channel); + + //FIXME try catch here + this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request)); + + RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis); + + this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, + extractRemoteAddress(channel), request, responseCommand)); + + return responseCommand; + } + + private RemotingCommand invoke0(final String remoteAddr, final Channel channel, final RemotingCommand request, + final long timeoutMillis) { + try { + final int opaque = request.requestID(); + final ResponseResult responseResult = new ResponseResult(opaque, timeoutMillis); + responseResult.setRequestCommand(request); + //FIXME one interceptor for all case ? + responseResult.setInterceptorGroup(this.interceptorGroup); + responseResult.setRemoteAddr(remoteAddr); + + this.ackTables.put(opaque, responseResult); + + ChannelFutureListener listener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + if (f.isSuccess()) { + responseResult.setSendRequestOK(true); + return; + } else { + responseResult.setSendRequestOK(false); + + ackTables.remove(opaque); + responseResult.setCause(f.cause()); + responseResult.putResponse(null); + + LOG.warn("Send request command to {} failed !", remoteAddr); + } + } + }; + + this.writeAndFlush(channel, request, listener); + + RemotingCommand responseCommand = responseResult.waitResponse(timeoutMillis); + + if (null == responseCommand) { + if (responseResult.isSendRequestOK()) { + throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseResult.getCause()); + } + /* + else { + throw new RemoteAccessException(extractRemoteAddress(channel), responseResult.getCause()); + }*/ + } + + return responseCommand; + } finally { + this.ackTables.remove(request.requestID()); + } + } + + public void invokeAsyncWithInterceptor(final Channel channel, final RemotingCommand request, + final AsyncHandler invokeCallback, long timeoutMillis) { + request.trafficType(TrafficType.REQUEST_ASYNC); + + final String remoteAddr = extractRemoteAddress(channel); + + this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request)); + + Exception exception = null; + + try { + this.invokeAsync0(remoteAddr, channel, request, timeoutMillis, invokeCallback); + } catch (InterruptedException e) { + exception = e; + } finally { + if (null != exception) { + try { + this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION")); + } catch (Throwable e) { + LOG.warn("onException ", e); + } + } + } + } + + private void invokeAsync0(final String remoteAddr, final Channel channel, final RemotingCommand request, + final long timeoutMillis, final AsyncHandler invokeCallback) throws InterruptedException { + boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + if (acquired) { + final int requestID = request.requestID(); + + SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); + + final ResponseResult responseResult = new ResponseResult(request.requestID(), timeoutMillis, invokeCallback, once); + responseResult.setRequestCommand(request); + responseResult.setInterceptorGroup(this.interceptorGroup); + responseResult.setRemoteAddr(remoteAddr); + + this.ackTables.put(request.requestID(), responseResult); + try { + ChannelFutureListener listener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + responseResult.setSendRequestOK(f.isSuccess()); + if (f.isSuccess()) { + return; + } + + responseResult.putResponse(null); + ackTables.remove(requestID); + try { + responseResult.executeRequestSendFailed(); + } catch (Throwable e) { + LOG.warn("Execute callback error !", e); + } finally { + responseResult.release(); + } + + LOG.warn("Send request command to channel failed.", remoteAddr); + } + }; + + this.writeAndFlush(channel, request, listener); + } catch (Exception e) { + responseResult.release(); + LOG.error("Send request command to channel " + channel + " error !", e); + } + } else { + String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d", + timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits()); + LOG.error(info); + throw new RemoteTimeoutException(info); + } + } + + public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request, long timeoutMillis) { + request.trafficType(TrafficType.REQUEST_ONEWAY); + + this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request)); + + Exception exception = null; + + try { + this.invokeOneway0(channel, request, timeoutMillis); + } catch (InterruptedException e) { + exception = e; + } finally { + if (null != exception) { + try { + this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION")); + } catch (Throwable e) { + LOG.warn("onException ", e); + } + } + } + } + + private void invokeOneway0(final Channel channel, final RemotingCommand request, + final long timeoutMillis) throws InterruptedException { + boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + if (acquired) { + final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); + try { + final SocketAddress socketAddress = channel.remoteAddress(); + + ChannelFutureListener listener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + once.release(); + if (!f.isSuccess()) { + LOG.warn("Send request command to channel {} failed !", socketAddress); + } + } + }; + + this.writeAndFlush(channel, request, listener); + } catch (Exception e) { + once.release(); + LOG.error("Send request command to channel " + channel + " error !", e); + } + } else { + String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d", + timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits()); + LOG.error(info); + throw new RemoteTimeoutException(info); + } + } + + public String getRemotingInstanceId() { + return remotingInstanceId; + } + + @Override + public ProtocolFactory protocolFactory() { + return this.protocolFactory; + } + + @Override + public SerializerFactory serializerFactory() { + return this.serializerFactory; + } + + @Override + public RemotingCommandFactory commandFactory() { + return this.remotingCommandFactory; + } + + @Override + public void registerRequestProcessor(String requestCode, RequestProcessor processor, ExecutorService executor) { + Pair<RequestProcessor, ExecutorService> pair = new Pair<RequestProcessor, ExecutorService>(processor, executor); + if (!this.processorTables.containsKey(requestCode)) { + this.processorTables.put(requestCode, pair); + } + } + + @Override + public void registerRequestProcessor(String requestCode, RequestProcessor processor) { + this.registerRequestProcessor(requestCode, processor, publicExecutor); + } + + @Override + public void unregisterRequestProcessor(String requestCode) { + this.processorTables.remove(requestCode); + } + + @Override + public String remotingInstanceId() { + return this.getRemotingInstanceId(); + } + + @Override + public void registerInterceptor(Interceptor interceptor) { + this.interceptorGroup.registerInterceptor(interceptor); + } + + @Override + public void registerChannelEventListener(ChannelEventListener listener) { + this.channelEventListenerGroup.registerChannelEventListener(listener); + } + + @Override + public Pair<RequestProcessor, ExecutorService> processor(String requestCode) { + return processorTables.get(requestCode); + } + + protected String extractRemoteAddress(Channel channel) { + return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress(); + } + + class ChannelEventExecutor extends Thread { + private final static int MAX_SIZE = 10000; + private final LinkedBlockingQueue<NettyChannelEvent> eventQueue = new LinkedBlockingQueue<NettyChannelEvent>(); + private String name; + + public ChannelEventExecutor(String nettyEventExector) { + super(nettyEventExector); + this.name = nettyEventExector; + } + //private final AtomicBoolean isStopped = new AtomicBoolean(true); + + public void putNettyEvent(final NettyChannelEvent event) { + if (this.eventQueue.size() <= MAX_SIZE) { + this.eventQueue.add(event); + } else { + LOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString()); + } + } + + @Override + public void run() { + LOG.info(this.name + " service started"); + + ChannelEventListenerGroup listener = NettyRemotingAbstract.this.channelEventListenerGroup; + + while (true) { + try { + NettyChannelEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS); + if (event != null && listener != null) { + RemotingChannel channel = new NettyChannelImpl(event.getChannel()); + + LOG.warn("Channel Event, {}", event); + + switch (event.getType()) { + case IDLE: + listener.onChannelIdle(channel); + break; + case INACTIVE: + listener.onChannelClose(channel); + break; + case ACTIVE: + listener.onChannelConnect(channel); + break; + case EXCEPTION: + listener.onChannelException(channel); + break; + default: + break; + } + } + } catch (Exception e) { + LOG.error("error", e); + break; + } + } + } + + } + + protected class EventDispatcher extends SimpleChannelInboundHandler<RemotingCommand> { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { + processMessageReceived(ctx, msg); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java new file mode 100644 index 0000000..7481574 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http2.Http2SecurityUtil; +import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.SupportedCipherSuiteFilter; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import javax.net.ssl.SSLException; +import org.apache.rocketmq.remoting.api.AsyncHandler; +import org.apache.rocketmq.remoting.api.RemotingClient; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.command.TrafficType; +import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException; +import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; +import org.apache.rocketmq.remoting.api.protocol.Protocol; +import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta; +import org.apache.rocketmq.remoting.config.RemotingConfig; +import org.apache.rocketmq.remoting.external.ThreadUtils; +import org.apache.rocketmq.remoting.impl.netty.handler.Decoder; +import org.apache.rocketmq.remoting.impl.netty.handler.Encoder; +import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler; +import org.apache.rocketmq.remoting.impl.netty.handler.Http2Handler; +import org.apache.rocketmq.remoting.internal.JvmUtils; + +public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { + private static final long LOCK_TIMEOUT_MILLIS = 3000; + private final Bootstrap clientBootstrap = new Bootstrap(); + private final EventLoopGroup ioGroup; + private final Class<? extends SocketChannel> socketChannelClass; + + private final RemotingConfig clientConfig; + + private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>(); + private final Lock lockChannelTables = new ReentrantLock(); + private EventExecutorGroup workerGroup; + private SslContext sslContext; + + NettyRemotingClient(final RemotingConfig clientConfig) { + super(clientConfig, new RemotingCommandFactoryMeta(clientConfig.getProtocolName(), clientConfig.getSerializerName())); + this.clientConfig = clientConfig; + + if (JvmUtils.isLinux() && this.clientConfig.isClientNativeEpollEnable()) { + this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads", + clientConfig.getClientWorkerThreads())); + socketChannelClass = EpollSocketChannel.class; + } else { + this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads", + clientConfig.getClientWorkerThreads())); + socketChannelClass = NioSocketChannel.class; + } + + this.workerGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(), + ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads())); + + if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) { + buildSslContext(); + } + } + + private void applyOptions(Bootstrap bootstrap) { + if (null != clientConfig) { + if (clientConfig.getTcpSoLinger() > 0) { + bootstrap.option(ChannelOption.SO_LINGER, clientConfig.getTcpSoLinger()); + } + + if (clientConfig.getTcpSoSndBufSize() > 0) { + bootstrap.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSoSndBufSize()); + } + if (clientConfig.getTcpSoRcvBufSize() > 0) { + bootstrap.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpSoRcvBufSize()); + } + + bootstrap.option(ChannelOption.SO_REUSEADDR, clientConfig.isTcpSoReuseAddress()). + option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()). + option(ChannelOption.TCP_NODELAY, clientConfig.isTcpSoNoDelay()). + option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeout()). + option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(clientConfig.getWriteBufLowWaterMark(), + clientConfig.getWriteBufHighWaterMark())); + } + } + + @Override + public void start() { + super.start(); + + this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass) + .handler(new ChannelInitializer<SocketChannel>() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) { + ch.pipeline().addFirst(sslContext.newHandler(ch.alloc()), Http2Handler.newHandler(false)); + } + ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(), + clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()), + new ClientConnectionHandler(), new EventDispatcher(), new ExceptionHandler()); + } + }); + + applyOptions(clientBootstrap); + + startUpHouseKeepingService(); + } + + private void buildSslContext() { + SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK; + try { + sslContext = SslContextBuilder.forClient() + .sslProvider(provider) + /* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification. + * Please refer to the HTTP/2 specification for cipher requirements. */ + .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + } catch (SSLException e) { + e.printStackTrace(); + } + } + + @Override + public void stop() { + // try { + ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); + + for (ChannelWrapper cw : this.channelTables.values()) { + this.closeChannel(null, cw.getChannel()); + } + + this.channelTables.clear(); + + this.ioGroup.shutdownGracefully(); + + ThreadUtils.shutdownGracefully(channelEventExecutor); + + this.workerGroup.shutdownGracefully(); + /* + } catch (Exception e) { + LOG.error("RemotingClient stopped error !", e); + } + */ + + super.stop(); + } + + private void closeChannel(final String addr, final Channel channel) { + if (null == channel) + return; + + final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr; + try { + if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + boolean removeItemFromTable = true; + ChannelWrapper prevCW = this.channelTables.get(addrRemote); + //Workaround for null + if (null == prevCW) { + return; + } + + LOG.info("Begin to close the remote address {} channel {}", addrRemote, prevCW); + + if (prevCW.getChannel() != channel) { + LOG.info("Channel {} has been closed,this is a new channel.", prevCW.getChannel(), channel); + removeItemFromTable = false; + } + + if (removeItemFromTable) { + this.channelTables.remove(addrRemote); + LOG.info("Channel {} has been removed !", addrRemote); + } + + channel.close().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + LOG.warn("Close channel {} {}", channel, future.isSuccess()); + } + }); + } catch (Exception e) { + LOG.error("Close channel error !", e); + } finally { + this.lockChannelTables.unlock(); + } + } else { + LOG.warn("Can not lock channel table in {} ms", LOCK_TIMEOUT_MILLIS); + } + } catch (InterruptedException e) { + LOG.error("Close channel error !", e); + } + } + + private Channel createIfAbsent(final String addr) { + ChannelWrapper cw = this.channelTables.get(addr); + if (cw != null && cw.isActive()) { + return cw.getChannel(); + } + return this.createChannel(addr); + } + + //FIXME need test to verify + private Channel createChannel(final String addr) { + ChannelWrapper cw = null; + try { + if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + boolean createNewConnection; + cw = this.channelTables.get(addr); + if (cw != null) { + if (cw.isActive()) { + return cw.getChannel(); + } else if (!cw.getChannelFuture().isDone()) { + createNewConnection = false; + } else { + this.channelTables.remove(addr); + createNewConnection = true; + } + } else { + createNewConnection = true; + } + + if (createNewConnection) { + String[] s = addr.split(":"); + SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1])); + ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress); + LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr); + cw = new ChannelWrapper(channelFuture); + this.channelTables.put(addr, cw); + } + } catch (Exception e) { + LOG.error("createChannel: create channel exception", e); + } finally { + this.lockChannelTables.unlock(); + } + } else { + LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + + if (cw != null) { + ChannelFuture channelFuture = cw.getChannelFuture(); + if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) { + if (cw.isActive()) { + LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); + return cw.getChannel(); + } else { + LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause()); + this.closeChannel(addr, cw.getChannel()); + } + } else { + LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(), + channelFuture.toString()); + this.closeChannel(addr, cw.getChannel()); + } + } + return null; + } + + private void closeChannel(final Channel channel) { + if (null == channel) + return; + + try { + if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + boolean removeItemFromTable = true; + ChannelWrapper prevCW = null; + String addrRemote = null; + + for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) { + ChannelWrapper prev = entry.getValue(); + if (prev.getChannel() != null) { + if (prev.getChannel() == channel) { + prevCW = prev; + addrRemote = entry.getKey(); + break; + } + } + } + + if (null == prevCW) { + LOG.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote); + removeItemFromTable = false; + } + + if (removeItemFromTable) { + this.channelTables.remove(addrRemote); + LOG.info("closeChannel: the channel[{}] was removed from channel table", addrRemote); + //RemotingHelper.closeChannel(channel); + } + } catch (Exception e) { + LOG.error("closeChannel: close the channel exception", e); + } finally { + this.lockChannelTables.unlock(); + } + } else { + LOG.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); + } + } catch (InterruptedException e) { + LOG.error("closeChannel exception", e); + } + } + + @Override + public RemotingCommand invoke(final String address, final RemotingCommand request, final long timeoutMillis) { + request.trafficType(TrafficType.REQUEST_SYNC); + + Channel channel = this.createIfAbsent(address); + if (channel != null && channel.isActive()) { + try { + return this.invokeWithInterceptor(channel, request, timeoutMillis); + + } catch (RemoteTimeoutException e) { + if (this.clientConfig.isClientCloseSocketIfTimeout()) { + LOG.warn("invoke: timeout, so close the socket {} ms, {}", timeoutMillis, address); + this.closeChannel(address, channel); + } + + LOG.warn("invoke: wait response timeout<{}ms> exception, so close the channel[{}]", timeoutMillis, address); + throw e; + } finally { + /* + if (this.clientConfig.isClientShortConnectionEnable()) { + this.closeChannel(addr, channel); + } + */ + } + } else { + this.closeChannel(address, channel); + throw new RemoteConnectFailureException(address); + } + + } + + @Override + public void invokeAsync(final String address, final RemotingCommand request, final AsyncHandler asyncHandler, + final long timeoutMillis) { + + final Channel channel = this.createIfAbsent(address); + if (channel != null && channel.isActive()) { + // We support Netty's channel-level backpressure thereby respecting slow receivers on the other side. + if (!channel.isWritable()) { + // Note: It's up to the layer above a transport to decide whether or not to requeue a canceled write. + LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable()); + } + this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis); + } else { + this.closeChannel(address, channel); + } + } + + @Override + public void invokeOneWay(final String address, final RemotingCommand request, final long timeoutMillis) { + final Channel channel = this.createIfAbsent(address); + if (channel != null && channel.isActive()) { + if (!channel.isWritable()) { + //if (this.clientConfig.isSocketFlowControl()) { + LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable()); + //throw new ServiceInvocationFailureException(String.format("Channel[%s] is not writable now", channel.toString())); + } + this.invokeOnewayWithInterceptor(channel, request, timeoutMillis); + } else { + this.closeChannel(address, channel); + } + } + + private class ChannelWrapper { + private final ChannelFuture channelFuture; + + ChannelWrapper(ChannelFuture channelFuture) { + this.channelFuture = channelFuture; + } + + boolean isActive() { + return this.channelFuture.channel() != null && this.channelFuture.channel().isActive(); + } + + boolean isWriteable() { + return this.channelFuture.channel().isWritable(); + } + + private Channel getChannel() { + return this.channelFuture.channel(); + } + + ChannelFuture getChannelFuture() { + return channelFuture; + } + } + + private class ClientConnectionHandler extends ChannelDuplexHandler { + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(), + ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable()); + } + + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, + ChannelPromise promise) + throws Exception { + LOG.info("Connected from {} to {}.", localAddress, remoteAddress); + super.connect(ctx, remoteAddress, localAddress, promise); + + putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel())); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + LOG.info("Remote address {} disconnect channel {}.", ctx.channel().remoteAddress(), ctx.channel()); + + closeChannel(ctx.channel()); + + super.disconnect(ctx, promise); + + putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel())); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + LOG.info("Remote address {} close channel {}.", ctx.channel().remoteAddress(), ctx.channel()); + + closeChannel(ctx.channel()); + + super.close(ctx, promise); + + putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel())); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state().equals(IdleState.ALL_IDLE)) { + LOG.info("Close channel {} because of idle event {} ", ctx.channel(), event); + closeChannel(ctx.channel()); + putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, ctx.channel())); + } + } + + ctx.fireUserEventTriggered(evt); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.info("Close channel {} because of error {} ", ctx.channel(), cause); + + closeChannel(ctx.channel()); + putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel())); + } + } +}