This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 6d85688cc4 add qos protocol to pu server (#10395)
6d85688cc4 is described below
commit 6d85688cc4e696cebe23218aa6b2703b59fd1ecf
Author: TrueAbc <[email protected]>
AuthorDate: Mon Aug 15 09:47:39 2022 +0800
add qos protocol to pu server (#10395)
* add qos to pu server
* add qos enable flag set
* null pointer Exception of mock url
* revoke pu actions to connection establishment
* telnet detector adjust
* frameworkModel already exists
* comment code style
* qos comments fix
* add telnet ayt detect
* change func name to lower camel case
* frameworkModel inject in Constructor
* frameworkModel inject in Constructor
---
.../dubbo/qos/protocol/QosProtocolWrapper.java | 6 ++
.../java/org/apache/dubbo/qos/pu/QosDetector.java | 56 +++++++++++++
.../org/apache/dubbo/qos/pu/QosHTTP1Detector.java | 39 +++++++++
.../org/apache/dubbo/qos/pu/QosWireProtocol.java | 63 ++++++++++++++
.../org/apache/dubbo/qos/pu/TelnetDetector.java | 95 ++++++++++++++++++++++
.../org.apache.dubbo.remoting.api.WireProtocol | 1 +
.../netty4/NettyPortUnificationServerHandler.java | 6 +-
.../transport/netty4/NettyServerHandler.java | 2 +
8 files changed, 266 insertions(+), 2 deletions(-)
diff --git
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/protocol/QosProtocolWrapper.java
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/protocol/QosProtocolWrapper.java
index f0e420cc5b..5f4651d471 100644
---
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/protocol/QosProtocolWrapper.java
+++
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/protocol/QosProtocolWrapper.java
@@ -21,7 +21,9 @@ import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.qos.common.QosConstants;
+import org.apache.dubbo.qos.pu.QosWireProtocol;
import org.apache.dubbo.qos.server.Server;
+import org.apache.dubbo.remoting.api.WireProtocol;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
@@ -96,6 +98,10 @@ public class QosProtocolWrapper implements Protocol,
ScopeModelAware {
}
boolean qosEnable = url.getParameter(QOS_ENABLE, true);
+ WireProtocol qosWireProtocol =
frameworkModel.getExtensionLoader(WireProtocol.class).getExtension("qos");
+ if(qosWireProtocol != null) {
+ ((QosWireProtocol) qosWireProtocol).setQosEnable(qosEnable);
+ }
if (!qosEnable) {
logger.info("qos won't be started because it is disabled. " +
"Please check dubbo.application.qos.enable is configured
either in system property, " +
diff --git
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosDetector.java
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosDetector.java
new file mode 100644
index 0000000000..6a91405352
--- /dev/null
+++
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosDetector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.qos.pu;
+
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+
+public class QosDetector implements ProtocolDetector {
+
+ private final QosHTTP1Detector qosHTTP1Detector = new QosHTTP1Detector();
+ private final TelnetDetector telnetDetector;
+ private boolean QosEnableFlag = true;
+
+ public void setQosEnableFlag(boolean qosEnableFlag) {
+ QosEnableFlag = qosEnableFlag;
+ }
+
+ public QosDetector(FrameworkModel frameworkModel) {
+ this.telnetDetector = new TelnetDetector(frameworkModel);
+ }
+
+ @Override
+ public Result detect(ChannelBuffer in) {
+ if(!QosEnableFlag) {
+ return Result.UNRECOGNIZED;
+ }
+ Result h1Res = qosHTTP1Detector.detect(in);
+ if(h1Res.equals(Result.RECOGNIZED)) {
+ return h1Res;
+ }
+ Result telRes = telnetDetector.detect(in);
+ if(telRes.equals(Result.RECOGNIZED)) {
+ return telRes;
+ }
+ if(h1Res.equals(Result.NEED_MORE_DATA) ||
telRes.equals(Result.NEED_MORE_DATA)) {
+ return Result.NEED_MORE_DATA;
+ }
+ return Result.UNRECOGNIZED;
+ }
+
+}
diff --git
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosHTTP1Detector.java
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosHTTP1Detector.java
new file mode 100644
index 0000000000..9a62b841b1
--- /dev/null
+++
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosHTTP1Detector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.qos.pu;
+
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+
+public class QosHTTP1Detector implements ProtocolDetector {
+ private static boolean isHttp(int magic) {
+ return magic == 'G' || magic == 'P';
+ }
+
+ @Override
+ public Result detect(ChannelBuffer in) {
+ if (in.readableBytes() < 2) {
+ return Result.NEED_MORE_DATA;
+ }
+ final int magic = in.getByte(in.readerIndex());
+ // h2 starts with "PR"
+ if (isHttp(magic) && in.getByte(in.readerIndex()+1) != 'R' ){
+ return Result.RECOGNIZED;
+ }
+ return Result.UNRECOGNIZED;
+ }
+}
diff --git
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
new file mode 100644
index 0000000000..71e6633eb3
--- /dev/null
+++
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.qos.pu;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.qos.server.DubboLogo;
+import org.apache.dubbo.qos.server.handler.QosProcessHandler;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.api.AbstractWireProtocol;
+import org.apache.dubbo.remoting.api.pu.ChannelHandlerPretender;
+import org.apache.dubbo.remoting.api.pu.ChannelOperator;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.ScopeModelAware;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Activate
+public class QosWireProtocol extends AbstractWireProtocol implements
ScopeModelAware {
+
+ public QosWireProtocol(FrameworkModel frameworkModel) {
+ super(new QosDetector(frameworkModel));
+ }
+
+ public void setQosEnable(boolean flag) {
+ ((QosDetector)this.detector()).setQosEnableFlag(flag);
+ }
+
+ @Override
+ public void configServerProtocolHandler(URL url, ChannelOperator operator)
{
+ // add qosProcess handler
+ QosProcessHandler handler = new
QosProcessHandler(url.getOrDefaultFrameworkModel(),
+ DubboLogo.DUBBO, false);
+ List<ChannelHandler> handlers = new ArrayList<>();
+ handlers.add(new ChannelHandlerPretender(handler));
+ operator.configChannelHandler(handlers);
+ }
+
+
+ @Override
+ public void configClientPipeline(URL url, ChannelPipeline pipeline,
SslContext sslContext) {
+
+ }
+
+}
diff --git
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/TelnetDetector.java
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/TelnetDetector.java
new file mode 100644
index 0000000000..84199937d4
--- /dev/null
+++
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/TelnetDetector.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.qos.pu;
+
+import org.apache.dubbo.qos.command.BaseCommand;
+import org.apache.dubbo.qos.command.CommandContext;
+import org.apache.dubbo.qos.command.decoder.TelnetCommandDecoder;
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+import org.apache.dubbo.remoting.buffer.HeapChannelBuffer;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+
+import io.netty.util.CharsetUtil;
+
+import static java.lang.Math.min;
+
+
+public class TelnetDetector implements ProtocolDetector {
+
+ private FrameworkModel frameworkModel;
+ private final int MaxSize = 2048;
+ private final ChannelBuffer AytPreface = new HeapChannelBuffer(new
byte[]{(byte) 0xff, (byte) 0xf6});
+
+ public TelnetDetector(FrameworkModel frameworkModel) {
+ this.frameworkModel = frameworkModel;
+ }
+
+ @Override
+ public Result detect(ChannelBuffer in) {
+ if (in.readableBytes() >= MaxSize) {
+ return Result.UNRECOGNIZED;
+ }
+ Result resCommand = commandDetect(in);
+ if (resCommand.equals(Result.RECOGNIZED)){
+ return resCommand;
+ }
+ Result resAyt = telnetAytDetect(in);
+ if (resAyt.equals(Result.RECOGNIZED)) {
+ return resAyt;
+ }
+ if (resAyt.equals(Result.UNRECOGNIZED) &&
resCommand.equals(Result.UNRECOGNIZED)) {
+ return Result.UNRECOGNIZED;
+ }
+ return Result.NEED_MORE_DATA;
+ }
+
+ private Result commandDetect(ChannelBuffer in) {
+ // detect if remote channel send a qos command to server
+ ChannelBuffer back = in.copy();
+ byte[] backBytes = new byte[back.readableBytes()];
+ back.getBytes(back.readerIndex(), backBytes);
+
+ String s = new String(backBytes, CharsetUtil.UTF_8);
+ // trim /r/n to let parser work for input
+ s = s.trim();
+ CommandContext commandContext = TelnetCommandDecoder.decode(s);
+
if(frameworkModel.getExtensionLoader(BaseCommand.class).hasExtension(commandContext.getCommandName())){
+ return Result.RECOGNIZED;
+ }
+ return Result.UNRECOGNIZED;
+ }
+
+ private Result telnetAytDetect(ChannelBuffer in) {
+ // detect if remote channel send a telnet ayt command to server
+ int prefaceLen = AytPreface.readableBytes();
+ int bytesRead = min(in.readableBytes(), prefaceLen);
+ if(bytesRead == 0 || !ChannelBuffers.prefixEquals(in, AytPreface,
bytesRead)) {
+ return Result.UNRECOGNIZED;
+ }
+ if(bytesRead == prefaceLen) {
+ // we need to consume preface because it's not a qos command
+ // consume and remember to mark, pu server handler reset reader
index
+ in.readBytes(AytPreface.readableBytes());
+ in.markReaderIndex();
+ return Result.RECOGNIZED;
+ }
+ return Result.NEED_MORE_DATA;
+ }
+
+}
diff --git
a/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
b/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
new file mode 100644
index 0000000000..cd4d62dd5c
--- /dev/null
+++
b/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
@@ -0,0 +1 @@
+qos=org.apache.dubbo.qos.pu.QosWireProtocol
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
index f30f840496..7605798c67 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
@@ -77,7 +77,8 @@ public class NettyPortUnificationServerHandler extends
ByteToMessageDecoder {
throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(),
url, handler);
// Will use the first five bytes to detect a protocol.
- if (in.readableBytes() < 5) {
+ // size of telnet command ls is 2 bytes
+ if (in.readableBytes() < 2) {
return;
}
@@ -127,7 +128,8 @@ public class NettyPortUnificationServerHandler extends
ByteToMessageDecoder {
}
private boolean isSsl(ByteBuf buf) {
- if (detectSsl) {
+ // at least 5 bytes to determine if data is encrypted
+ if (detectSsl && buf.readableBytes() >= 5) {
return SslHandler.isEncrypted(buf);
}
return false;
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
index 8eaeb9457c..7ea3733120 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
@@ -95,6 +95,8 @@ public class NettyServerHandler extends ChannelDuplexHandler {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(),
url, handler);
handler.received(channel, msg);
+ // trigger qos handler
+ ctx.fireChannelRead(msg);
}