This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new f1c5d53ef1d branch-3.1: [opt](proxy-protocol) Support connecting to 
proxy protocol enabled cluster without proxy protocol header (#47776) (#51940)
f1c5d53ef1d is described below

commit f1c5d53ef1d311d7dbc379fa8b0ee6bf00628432
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Thu Jun 19 19:34:30 2025 +0800

    branch-3.1: [opt](proxy-protocol) Support connecting to proxy protocol 
enabled cluster without proxy protocol header (#47776) (#51940)
    
    bp #47776
---
 .../org/apache/doris/mysql/AcceptListener.java     |  7 +++--
 .../java/org/apache/doris/mysql/BytesChannel.java  |  4 +++
 .../java/org/apache/doris/mysql/MysqlChannel.java  | 13 +++++++++
 .../apache/doris/mysql/ProxyProtocolHandler.java   | 24 +++++++++++++----
 .../apache/doris/qe/ProxyProtocolHandlerTest.java  | 31 +++++++++++++++++++---
 5 files changed, 69 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
index 2614aeb2a8e..41cc416f585 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
@@ -20,6 +20,7 @@ package org.apache.doris.mysql;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
+import org.apache.doris.mysql.ProxyProtocolHandler.ProtocolType;
 import org.apache.doris.mysql.ProxyProtocolHandler.ProxyProtocolResult;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ConnectProcessor;
@@ -78,11 +79,13 @@ public class AcceptListener implements 
ChannelListener<AcceptingChannel<StreamCo
                         if (Config.enable_proxy_protocol) {
                             ProxyProtocolResult result = 
ProxyProtocolHandler.handle(context.getMysqlChannel());
                             Preconditions.checkNotNull(result);
-                            if (!result.isUnknown) {
+                            ProtocolType pType = result.pType;
+                            if (pType == ProtocolType.PROTOCOL_WITH_IP) {
                                 
context.getMysqlChannel().setRemoteAddr(result.sourceIP, result.sourcePort);
                             }
-                            // ignore the UNKNOWN, and just use IP from MySQL 
protocol.
+                            // For PROTOCOL_WITHOUT_IP, and just use IP from 
MySQL protocol.
                             // which is already set when creating MysqlChannel.
+                            // For NOT_PROXY_PROTOCOL, just ignore to let 
connection with no proxy protocol in.
                         }
 
                         // authenticate check failed.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java
index bf97ae8068d..6f0aec95d4a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java
@@ -26,4 +26,8 @@ public interface BytesChannel {
      * @return number of bytes read
      */
     public int read(ByteBuffer buffer);
+
+    default int testReadWithTimeout(ByteBuffer buffer, long timeoutMs) {
+        return read(buffer);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
index aa640c57eb7..13da84e67c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
@@ -253,6 +253,19 @@ public class MysqlChannel implements BytesChannel {
         return readLen;
     }
 
+    @Override
+    public int testReadWithTimeout(ByteBuffer dstBuf, long timeoutMs) {
+        Preconditions.checkArgument(dstBuf.remaining() == 1, 
dstBuf.remaining());
+        try {
+            return Channels.readBlocking(conn.getSourceChannel(), dstBuf, 
timeoutMs, TimeUnit.MILLISECONDS);
+        } catch (IOException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Read channel exception, ignore.", e);
+            }
+            return -1;
+        }
+    }
+
     protected void decryptData(ByteBuffer dstBuf, boolean isHeader) throws 
SSLException {
         // after decrypt, we get a mysql packet with mysql header.
         if (!isSslMode || isHeader) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java
index 0f52a05286e..0d9d1508f3e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java
@@ -49,12 +49,18 @@ public class ProxyProtocolHandler {
     private static final String TCP4 = "TCP4";
     private static final String TCP6 = "TCP6";
 
+    public enum ProtocolType {
+        PROTOCOL_WITH_IP, // protocol with source ip
+        PROTOCOL_WITHOUT_IP, // v2 protocol without source ip
+        NOT_PROXY_PROTOCOL  // not proxy protocol
+    }
+
     public static class ProxyProtocolResult {
         public String sourceIP = null;
         public int sourcePort = -1;
         public String destIp = null;
         public int destPort = -1;
-        public boolean isUnknown = false;
+        public ProtocolType pType = ProtocolType.PROTOCOL_WITH_IP;
 
         @Override
         public String toString() {
@@ -63,7 +69,7 @@ public class ProxyProtocolHandler {
                     + ", sourcePort=" + sourcePort
                     + ", destIp='" + destIp + '\''
                     + ", destPort=" + destPort
-                    + ", isUnknown=" + isUnknown
+                    + ", pType=" + pType
                     + '}';
         }
     }
@@ -71,10 +77,18 @@ public class ProxyProtocolHandler {
     public static ProxyProtocolResult handle(BytesChannel channel) throws 
IOException {
         // First read 1 byte to see if it is V1 or V2
         ByteBuffer buffer = ByteBuffer.allocate(1);
-        int readLen = channel.read(buffer);
-        if (readLen != 1) {
+        int readLen = channel.testReadWithTimeout(buffer, 10);
+        if (readLen == -1) {
+            throw new IOException("Remote peer closed the channel, ignore.");
+        } else if (readLen == 0) {
+            // 0 means remote peer does not send proxy protocol content.
+            ProxyProtocolResult result = new ProxyProtocolResult();
+            result.pType = ProtocolType.NOT_PROXY_PROTOCOL;
+            return result;
+        } else if (readLen != 1) {
             throw new IOException("Invalid proxy protocol, expect incoming 
bytes first");
         }
+
         buffer.flip();
         byte firstByte = buffer.get();
         if ((char) firstByte == V1_HEADER[0]) {
@@ -120,7 +134,7 @@ public class ProxyProtocolHandler {
                         throw new ProtocolException("Invalid proxy protocol 
v1. '\\r' is not found before '\\n'",
                                 debugInfo.toString());
                     }
-                    result.isUnknown = true;
+                    result.pType = ProtocolType.PROTOCOL_WITHOUT_IP;
                     return result;
                 } else if (carriageFound) {
                     throw new ProtocolException("Invalid proxy protocol v1. "
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java
index 13ad0b67d92..533c5ad9b46 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java
@@ -19,23 +19,30 @@ package org.apache.doris.qe;
 
 import org.apache.doris.mysql.BytesChannel;
 import org.apache.doris.mysql.ProxyProtocolHandler;
+import org.apache.doris.mysql.ProxyProtocolHandler.ProtocolType;
 
 import org.junit.Test;
 import org.junit.jupiter.api.Assertions;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 public class ProxyProtocolHandlerTest {
 
     public static class TestChannel implements BytesChannel {
         private byte[] data;
         private int pos;
+        private int testReadReturn = 1;
 
         public TestChannel(byte[] data) {
             this.data = data;
             this.pos = 0;
         }
 
+        public void setTestReadReturn(int testReadReturn) {
+            this.testReadReturn = testReadReturn;
+        }
+
         @Override
         public int read(java.nio.ByteBuffer buffer) {
             int len = Math.min(buffer.remaining(), data.length - pos);
@@ -45,6 +52,15 @@ public class ProxyProtocolHandlerTest {
             }
             return len;
         }
+
+        @Override
+        public int testReadWithTimeout(ByteBuffer buffer, long timeoutMs) {
+            if (testReadReturn == 1) {
+                return read(buffer);
+            } else {
+                return testReadReturn;
+            }
+        }
     }
 
     private TestChannel testChannel;
@@ -55,7 +71,7 @@ public class ProxyProtocolHandlerTest {
         testChannel = new TestChannel(data);
         ProxyProtocolHandler.ProxyProtocolResult result = 
ProxyProtocolHandler.handle(testChannel);
         Assertions.assertNotNull(result);
-        Assertions.assertFalse(result.isUnknown);
+        Assertions.assertEquals(ProtocolType.PROTOCOL_WITH_IP, result.pType);
         Assertions.assertEquals("192.168.0.1", result.sourceIP);
         Assertions.assertEquals(12345, result.sourcePort);
         Assertions.assertEquals("192.168.0.2", result.destIp);
@@ -68,7 +84,7 @@ public class ProxyProtocolHandlerTest {
         testChannel = new TestChannel(data);
         ProxyProtocolHandler.ProxyProtocolResult result = 
ProxyProtocolHandler.handle(testChannel);
         Assertions.assertNotNull(result);
-        Assertions.assertTrue(result.isUnknown);
+        Assertions.assertEquals(ProtocolType.PROTOCOL_WITHOUT_IP, 
result.pType);
     }
 
     @Test(expected = IOException.class)
@@ -105,13 +121,22 @@ public class ProxyProtocolHandlerTest {
         testChannel = new TestChannel(data);
         ProxyProtocolHandler.ProxyProtocolResult result = 
ProxyProtocolHandler.handle(testChannel);
         Assertions.assertNotNull(result);
-        Assertions.assertFalse(result.isUnknown);
+        Assertions.assertEquals(ProtocolType.PROTOCOL_WITH_IP, result.pType);
         Assertions.assertEquals("2001:db8:0:1:1:1:1:1", result.sourceIP);
         Assertions.assertEquals(12345, result.sourcePort);
         Assertions.assertEquals("2001:db8:0:1:1:1:1:2", result.destIp);
         Assertions.assertEquals(54321, result.destPort);
     }
 
+    @Test
+    public void handleNotProxyProtocol() throws IOException {
+        byte[] data = new byte[] {};
+        testChannel = new TestChannel(data);
+        testChannel.setTestReadReturn(0);
+        ProxyProtocolHandler.ProxyProtocolResult result = 
ProxyProtocolHandler.handle(testChannel);
+        Assertions.assertEquals(ProtocolType.NOT_PROXY_PROTOCOL, result.pType);
+    }
+
     @Test(expected = IOException.class)
     public void handleV1ProtocolWithInvalidIPv6Data() throws IOException {
         byte[] data = "PROXY TCP6 2001:db8:0:1:1:1:1:1 2001:db8:0:1:1:1:1:2 
12345 EXTRA DATA\r\n".getBytes();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to