This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 6d85688cc4 add qos protocol to pu server (#10395)
6d85688cc4 is described below

commit 6d85688cc4e696cebe23218aa6b2703b59fd1ecf
Author: TrueAbc <[email protected]>
AuthorDate: Mon Aug 15 09:47:39 2022 +0800

    add qos protocol to pu server (#10395)
    
    * add qos to pu server
    
    * add qos enable flag set
    
    * null pointer Exception of mock url
    
    * revoke pu actions to connection establishment
    
    * telnet detector adjust
    
    * frameworkModel already exists
    
    * comment code style
    
    * qos comments fix
    
    * add telnet ayt detect
    
    * change func name to lower camel case
    
    * frameworkModel inject in Constructor
    
    * frameworkModel inject in Constructor
---
 .../dubbo/qos/protocol/QosProtocolWrapper.java     |  6 ++
 .../java/org/apache/dubbo/qos/pu/QosDetector.java  | 56 +++++++++++++
 .../org/apache/dubbo/qos/pu/QosHTTP1Detector.java  | 39 +++++++++
 .../org/apache/dubbo/qos/pu/QosWireProtocol.java   | 63 ++++++++++++++
 .../org/apache/dubbo/qos/pu/TelnetDetector.java    | 95 ++++++++++++++++++++++
 .../org.apache.dubbo.remoting.api.WireProtocol     |  1 +
 .../netty4/NettyPortUnificationServerHandler.java  |  6 +-
 .../transport/netty4/NettyServerHandler.java       |  2 +
 8 files changed, 266 insertions(+), 2 deletions(-)

diff --git 
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/protocol/QosProtocolWrapper.java
 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/protocol/QosProtocolWrapper.java
index f0e420cc5b..5f4651d471 100644
--- 
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/protocol/QosProtocolWrapper.java
+++ 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/protocol/QosProtocolWrapper.java
@@ -21,7 +21,9 @@ import org.apache.dubbo.common.extension.Activate;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.qos.common.QosConstants;
+import org.apache.dubbo.qos.pu.QosWireProtocol;
 import org.apache.dubbo.qos.server.Server;
+import org.apache.dubbo.remoting.api.WireProtocol;
 import org.apache.dubbo.rpc.Exporter;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Protocol;
@@ -96,6 +98,10 @@ public class QosProtocolWrapper implements Protocol, 
ScopeModelAware {
             }
 
             boolean qosEnable = url.getParameter(QOS_ENABLE, true);
+            WireProtocol qosWireProtocol = 
frameworkModel.getExtensionLoader(WireProtocol.class).getExtension("qos");
+            if(qosWireProtocol != null) {
+                ((QosWireProtocol) qosWireProtocol).setQosEnable(qosEnable);
+            }
             if (!qosEnable) {
                 logger.info("qos won't be started because it is disabled. " +
                     "Please check dubbo.application.qos.enable is configured 
either in system property, " +
diff --git 
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosDetector.java 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosDetector.java
new file mode 100644
index 0000000000..6a91405352
--- /dev/null
+++ 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosDetector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.qos.pu;
+
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+
+public class QosDetector implements ProtocolDetector {
+
+    private final QosHTTP1Detector qosHTTP1Detector = new QosHTTP1Detector();
+    private final TelnetDetector telnetDetector;
+    private boolean QosEnableFlag = true;
+
+    public void setQosEnableFlag(boolean qosEnableFlag) {
+        QosEnableFlag = qosEnableFlag;
+    }
+
+    public QosDetector(FrameworkModel frameworkModel) {
+        this.telnetDetector = new TelnetDetector(frameworkModel);
+    }
+
+    @Override
+    public Result detect(ChannelBuffer in) {
+        if(!QosEnableFlag) {
+            return Result.UNRECOGNIZED;
+        }
+        Result h1Res = qosHTTP1Detector.detect(in);
+        if(h1Res.equals(Result.RECOGNIZED)) {
+            return h1Res;
+        }
+        Result telRes = telnetDetector.detect(in);
+        if(telRes.equals(Result.RECOGNIZED)) {
+            return telRes;
+        }
+        if(h1Res.equals(Result.NEED_MORE_DATA) || 
telRes.equals(Result.NEED_MORE_DATA)) {
+            return Result.NEED_MORE_DATA;
+        }
+        return Result.UNRECOGNIZED;
+    }
+
+}
diff --git 
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosHTTP1Detector.java
 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosHTTP1Detector.java
new file mode 100644
index 0000000000..9a62b841b1
--- /dev/null
+++ 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosHTTP1Detector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.qos.pu;
+
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+
+public class QosHTTP1Detector implements ProtocolDetector {
+    private static boolean isHttp(int magic) {
+        return magic == 'G' || magic == 'P';
+    }
+
+    @Override
+    public Result detect(ChannelBuffer in) {
+        if (in.readableBytes() < 2) {
+            return Result.NEED_MORE_DATA;
+        }
+        final int magic = in.getByte(in.readerIndex());
+        // h2 starts with "PR"
+        if (isHttp(magic) && in.getByte(in.readerIndex()+1) != 'R' ){
+            return Result.RECOGNIZED;
+        }
+        return Result.UNRECOGNIZED;
+    }
+}
diff --git 
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
new file mode 100644
index 0000000000..71e6633eb3
--- /dev/null
+++ 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.qos.pu;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.qos.server.DubboLogo;
+import org.apache.dubbo.qos.server.handler.QosProcessHandler;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.api.AbstractWireProtocol;
+import org.apache.dubbo.remoting.api.pu.ChannelHandlerPretender;
+import org.apache.dubbo.remoting.api.pu.ChannelOperator;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.ScopeModelAware;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Activate
+public class QosWireProtocol extends AbstractWireProtocol implements 
ScopeModelAware {
+
+    public QosWireProtocol(FrameworkModel frameworkModel) {
+        super(new QosDetector(frameworkModel));
+    }
+
+    public void setQosEnable(boolean flag) {
+        ((QosDetector)this.detector()).setQosEnableFlag(flag);
+    }
+
+    @Override
+    public void configServerProtocolHandler(URL url, ChannelOperator operator) 
{
+        // add qosProcess handler
+        QosProcessHandler handler = new 
QosProcessHandler(url.getOrDefaultFrameworkModel(),
+            DubboLogo.DUBBO, false);
+        List<ChannelHandler> handlers = new ArrayList<>();
+        handlers.add(new ChannelHandlerPretender(handler));
+        operator.configChannelHandler(handlers);
+    }
+
+
+    @Override
+    public void configClientPipeline(URL url, ChannelPipeline pipeline, 
SslContext sslContext) {
+
+    }
+
+}
diff --git 
a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/TelnetDetector.java
 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/TelnetDetector.java
new file mode 100644
index 0000000000..84199937d4
--- /dev/null
+++ 
b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/TelnetDetector.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.qos.pu;
+
+import org.apache.dubbo.qos.command.BaseCommand;
+import org.apache.dubbo.qos.command.CommandContext;
+import org.apache.dubbo.qos.command.decoder.TelnetCommandDecoder;
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+import org.apache.dubbo.remoting.buffer.HeapChannelBuffer;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+
+import io.netty.util.CharsetUtil;
+
+import static java.lang.Math.min;
+
+
+public class TelnetDetector implements ProtocolDetector {
+
+    private FrameworkModel frameworkModel;
+    private final int MaxSize = 2048;
+    private final ChannelBuffer AytPreface = new HeapChannelBuffer(new 
byte[]{(byte) 0xff, (byte) 0xf6});
+
+    public TelnetDetector(FrameworkModel frameworkModel) {
+        this.frameworkModel = frameworkModel;
+    }
+
+    @Override
+    public Result detect(ChannelBuffer in) {
+        if (in.readableBytes() >= MaxSize) {
+            return Result.UNRECOGNIZED;
+        }
+        Result resCommand = commandDetect(in);
+        if (resCommand.equals(Result.RECOGNIZED)){
+            return resCommand;
+        }
+        Result resAyt = telnetAytDetect(in);
+        if (resAyt.equals(Result.RECOGNIZED)) {
+            return resAyt;
+        }
+        if (resAyt.equals(Result.UNRECOGNIZED) && 
resCommand.equals(Result.UNRECOGNIZED)) {
+            return Result.UNRECOGNIZED;
+        }
+        return Result.NEED_MORE_DATA;
+    }
+
+    private Result commandDetect(ChannelBuffer in) {
+        // detect if remote channel send a qos command to server
+        ChannelBuffer back = in.copy();
+        byte[] backBytes = new byte[back.readableBytes()];
+        back.getBytes(back.readerIndex(), backBytes);
+
+        String s = new String(backBytes, CharsetUtil.UTF_8);
+        // trim /r/n to let parser work for input
+        s = s.trim();
+        CommandContext commandContext = TelnetCommandDecoder.decode(s);
+        
if(frameworkModel.getExtensionLoader(BaseCommand.class).hasExtension(commandContext.getCommandName())){
+            return Result.RECOGNIZED;
+        }
+        return Result.UNRECOGNIZED;
+    }
+
+    private Result telnetAytDetect(ChannelBuffer in) {
+        // detect if remote channel send a telnet ayt command to server
+        int prefaceLen = AytPreface.readableBytes();
+        int bytesRead = min(in.readableBytes(), prefaceLen);
+        if(bytesRead == 0 || !ChannelBuffers.prefixEquals(in, AytPreface, 
bytesRead)) {
+            return Result.UNRECOGNIZED;
+        }
+        if(bytesRead == prefaceLen) {
+            // we need to consume preface because it's not a qos command
+            // consume and remember to mark, pu server handler reset reader 
index
+            in.readBytes(AytPreface.readableBytes());
+            in.markReaderIndex();
+            return Result.RECOGNIZED;
+        }
+        return Result.NEED_MORE_DATA;
+    }
+
+}
diff --git 
a/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
 
b/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
new file mode 100644
index 0000000000..cd4d62dd5c
--- /dev/null
+++ 
b/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
@@ -0,0 +1 @@
+qos=org.apache.dubbo.qos.pu.QosWireProtocol
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
index f30f840496..7605798c67 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
@@ -77,7 +77,8 @@ public class NettyPortUnificationServerHandler extends 
ByteToMessageDecoder {
         throws Exception {
         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), 
url, handler);
         // Will use the first five bytes to detect a protocol.
-        if (in.readableBytes() < 5) {
+        // size of telnet command ls is 2 bytes
+        if (in.readableBytes() < 2) {
             return;
         }
 
@@ -127,7 +128,8 @@ public class NettyPortUnificationServerHandler extends 
ByteToMessageDecoder {
     }
 
     private boolean isSsl(ByteBuf buf) {
-        if (detectSsl) {
+        // at least 5 bytes to determine if data is encrypted
+        if (detectSsl && buf.readableBytes() >= 5) {
             return SslHandler.isEncrypted(buf);
         }
         return false;
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
index 8eaeb9457c..7ea3733120 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
@@ -95,6 +95,8 @@ public class NettyServerHandler extends ChannelDuplexHandler {
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), 
url, handler);
         handler.received(channel, msg);
+        // trigger qos handler
+        ctx.fireChannelRead(msg);
     }
 
 

Reply via email to