This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new f1c5d53ef1d branch-3.1: [opt](proxy-protocol) Support connecting to
proxy protocol enabled cluster without proxy protocol header (#47776) (#51940)
f1c5d53ef1d is described below
commit f1c5d53ef1d311d7dbc379fa8b0ee6bf00628432
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Thu Jun 19 19:34:30 2025 +0800
branch-3.1: [opt](proxy-protocol) Support connecting to proxy protocol
enabled cluster without proxy protocol header (#47776) (#51940)
bp #47776
---
.../org/apache/doris/mysql/AcceptListener.java | 7 +++--
.../java/org/apache/doris/mysql/BytesChannel.java | 4 +++
.../java/org/apache/doris/mysql/MysqlChannel.java | 13 +++++++++
.../apache/doris/mysql/ProxyProtocolHandler.java | 24 +++++++++++++----
.../apache/doris/qe/ProxyProtocolHandlerTest.java | 31 +++++++++++++++++++---
5 files changed, 69 insertions(+), 10 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
index 2614aeb2a8e..41cc416f585 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
@@ -20,6 +20,7 @@ package org.apache.doris.mysql;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
+import org.apache.doris.mysql.ProxyProtocolHandler.ProtocolType;
import org.apache.doris.mysql.ProxyProtocolHandler.ProxyProtocolResult;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
@@ -78,11 +79,13 @@ public class AcceptListener implements
ChannelListener<AcceptingChannel<StreamCo
if (Config.enable_proxy_protocol) {
ProxyProtocolResult result =
ProxyProtocolHandler.handle(context.getMysqlChannel());
Preconditions.checkNotNull(result);
- if (!result.isUnknown) {
+ ProtocolType pType = result.pType;
+ if (pType == ProtocolType.PROTOCOL_WITH_IP) {
context.getMysqlChannel().setRemoteAddr(result.sourceIP, result.sourcePort);
}
- // ignore the UNKNOWN, and just use IP from MySQL
protocol.
+ // For PROTOCOL_WITHOUT_IP, and just use IP from
MySQL protocol.
// which is already set when creating MysqlChannel.
+ // For NOT_PROXY_PROTOCOL, just ignore to let
connection with no proxy protocol in.
}
// authenticate check failed.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java
index bf97ae8068d..6f0aec95d4a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/BytesChannel.java
@@ -26,4 +26,8 @@ public interface BytesChannel {
* @return number of bytes read
*/
public int read(ByteBuffer buffer);
+
+ default int testReadWithTimeout(ByteBuffer buffer, long timeoutMs) {
+ return read(buffer);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
index aa640c57eb7..13da84e67c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
@@ -253,6 +253,19 @@ public class MysqlChannel implements BytesChannel {
return readLen;
}
+ @Override
+ public int testReadWithTimeout(ByteBuffer dstBuf, long timeoutMs) {
+ Preconditions.checkArgument(dstBuf.remaining() == 1,
dstBuf.remaining());
+ try {
+ return Channels.readBlocking(conn.getSourceChannel(), dstBuf,
timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Read channel exception, ignore.", e);
+ }
+ return -1;
+ }
+ }
+
protected void decryptData(ByteBuffer dstBuf, boolean isHeader) throws
SSLException {
// after decrypt, we get a mysql packet with mysql header.
if (!isSslMode || isHeader) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java
index 0f52a05286e..0d9d1508f3e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyProtocolHandler.java
@@ -49,12 +49,18 @@ public class ProxyProtocolHandler {
private static final String TCP4 = "TCP4";
private static final String TCP6 = "TCP6";
+ public enum ProtocolType {
+ PROTOCOL_WITH_IP, // protocol with source ip
+ PROTOCOL_WITHOUT_IP, // v2 protocol without source ip
+ NOT_PROXY_PROTOCOL // not proxy protocol
+ }
+
public static class ProxyProtocolResult {
public String sourceIP = null;
public int sourcePort = -1;
public String destIp = null;
public int destPort = -1;
- public boolean isUnknown = false;
+ public ProtocolType pType = ProtocolType.PROTOCOL_WITH_IP;
@Override
public String toString() {
@@ -63,7 +69,7 @@ public class ProxyProtocolHandler {
+ ", sourcePort=" + sourcePort
+ ", destIp='" + destIp + '\''
+ ", destPort=" + destPort
- + ", isUnknown=" + isUnknown
+ + ", pType=" + pType
+ '}';
}
}
@@ -71,10 +77,18 @@ public class ProxyProtocolHandler {
public static ProxyProtocolResult handle(BytesChannel channel) throws
IOException {
// First read 1 byte to see if it is V1 or V2
ByteBuffer buffer = ByteBuffer.allocate(1);
- int readLen = channel.read(buffer);
- if (readLen != 1) {
+ int readLen = channel.testReadWithTimeout(buffer, 10);
+ if (readLen == -1) {
+ throw new IOException("Remote peer closed the channel, ignore.");
+ } else if (readLen == 0) {
+ // 0 means remote peer does not send proxy protocol content.
+ ProxyProtocolResult result = new ProxyProtocolResult();
+ result.pType = ProtocolType.NOT_PROXY_PROTOCOL;
+ return result;
+ } else if (readLen != 1) {
throw new IOException("Invalid proxy protocol, expect incoming
bytes first");
}
+
buffer.flip();
byte firstByte = buffer.get();
if ((char) firstByte == V1_HEADER[0]) {
@@ -120,7 +134,7 @@ public class ProxyProtocolHandler {
throw new ProtocolException("Invalid proxy protocol
v1. '\\r' is not found before '\\n'",
debugInfo.toString());
}
- result.isUnknown = true;
+ result.pType = ProtocolType.PROTOCOL_WITHOUT_IP;
return result;
} else if (carriageFound) {
throw new ProtocolException("Invalid proxy protocol v1. "
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java
index 13ad0b67d92..533c5ad9b46 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ProxyProtocolHandlerTest.java
@@ -19,23 +19,30 @@ package org.apache.doris.qe;
import org.apache.doris.mysql.BytesChannel;
import org.apache.doris.mysql.ProxyProtocolHandler;
+import org.apache.doris.mysql.ProxyProtocolHandler.ProtocolType;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import java.io.IOException;
+import java.nio.ByteBuffer;
public class ProxyProtocolHandlerTest {
public static class TestChannel implements BytesChannel {
private byte[] data;
private int pos;
+ private int testReadReturn = 1;
public TestChannel(byte[] data) {
this.data = data;
this.pos = 0;
}
+ public void setTestReadReturn(int testReadReturn) {
+ this.testReadReturn = testReadReturn;
+ }
+
@Override
public int read(java.nio.ByteBuffer buffer) {
int len = Math.min(buffer.remaining(), data.length - pos);
@@ -45,6 +52,15 @@ public class ProxyProtocolHandlerTest {
}
return len;
}
+
+ @Override
+ public int testReadWithTimeout(ByteBuffer buffer, long timeoutMs) {
+ if (testReadReturn == 1) {
+ return read(buffer);
+ } else {
+ return testReadReturn;
+ }
+ }
}
private TestChannel testChannel;
@@ -55,7 +71,7 @@ public class ProxyProtocolHandlerTest {
testChannel = new TestChannel(data);
ProxyProtocolHandler.ProxyProtocolResult result =
ProxyProtocolHandler.handle(testChannel);
Assertions.assertNotNull(result);
- Assertions.assertFalse(result.isUnknown);
+ Assertions.assertEquals(ProtocolType.PROTOCOL_WITH_IP, result.pType);
Assertions.assertEquals("192.168.0.1", result.sourceIP);
Assertions.assertEquals(12345, result.sourcePort);
Assertions.assertEquals("192.168.0.2", result.destIp);
@@ -68,7 +84,7 @@ public class ProxyProtocolHandlerTest {
testChannel = new TestChannel(data);
ProxyProtocolHandler.ProxyProtocolResult result =
ProxyProtocolHandler.handle(testChannel);
Assertions.assertNotNull(result);
- Assertions.assertTrue(result.isUnknown);
+ Assertions.assertEquals(ProtocolType.PROTOCOL_WITHOUT_IP,
result.pType);
}
@Test(expected = IOException.class)
@@ -105,13 +121,22 @@ public class ProxyProtocolHandlerTest {
testChannel = new TestChannel(data);
ProxyProtocolHandler.ProxyProtocolResult result =
ProxyProtocolHandler.handle(testChannel);
Assertions.assertNotNull(result);
- Assertions.assertFalse(result.isUnknown);
+ Assertions.assertEquals(ProtocolType.PROTOCOL_WITH_IP, result.pType);
Assertions.assertEquals("2001:db8:0:1:1:1:1:1", result.sourceIP);
Assertions.assertEquals(12345, result.sourcePort);
Assertions.assertEquals("2001:db8:0:1:1:1:1:2", result.destIp);
Assertions.assertEquals(54321, result.destPort);
}
+ @Test
+ public void handleNotProxyProtocol() throws IOException {
+ byte[] data = new byte[] {};
+ testChannel = new TestChannel(data);
+ testChannel.setTestReadReturn(0);
+ ProxyProtocolHandler.ProxyProtocolResult result =
ProxyProtocolHandler.handle(testChannel);
+ Assertions.assertEquals(ProtocolType.NOT_PROXY_PROTOCOL, result.pType);
+ }
+
@Test(expected = IOException.class)
public void handleV1ProtocolWithInvalidIPv6Data() throws IOException {
byte[] data = "PROXY TCP6 2001:db8:0:1:1:1:1:1 2001:db8:0:1:1:1:1:2
12345 EXTRA DATA\r\n".getBytes();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]