This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 20186b31f45 [enhancement](mysql-channel) avoid potential buffer
overflow when flushing send buffer occurs IOE (#30868)
20186b31f45 is described below
commit 20186b31f459b8698b6ac455f57c7ff7aa299454
Author: Siyang Tang <[email protected]>
AuthorDate: Thu Feb 8 12:02:53 2024 +0800
[enhancement](mysql-channel) avoid potential buffer overflow when flushing
send buffer occurs IOE (#30868)
---
.../java/org/apache/doris/mysql/MysqlChannel.java | 22 +++---
.../org/apache/doris/mysql/MysqlChannelTest.java | 92 ++++++++++++++++++++++
2 files changed, 104 insertions(+), 10 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
index 71eaf59863d..5dfa7947abe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
@@ -401,8 +401,11 @@ public class MysqlChannel {
return;
}
sendBuffer.flip();
- realNetSend(sendBuffer);
- sendBuffer.clear();
+ try {
+ realNetSend(sendBuffer);
+ } finally {
+ sendBuffer.clear();
+ }
isSend = true;
}
@@ -423,18 +426,17 @@ public class MysqlChannel {
sendBuffer.put((byte) sequenceId);
}
- private void writeBuffer(ByteBuffer buffer, boolean isSsl) throws
IOException {
+ private void writeBuffer(ByteBuffer buffer) throws IOException {
if (null == sendBuffer) {
return;
}
- long leftLength = sendBuffer.capacity() - sendBuffer.position();
// If too long for buffer, send buffered data.
- if (leftLength < buffer.remaining()) {
+ if (sendBuffer.remaining() < buffer.remaining()) {
// Flush data in buffer.
flush();
}
// Send this buffer if large enough
- if (buffer.remaining() > sendBuffer.capacity()) {
+ if (buffer.remaining() > sendBuffer.remaining()) {
realNetSend(buffer);
return;
}
@@ -451,20 +453,20 @@ public class MysqlChannel {
bufLen = MAX_PHYSICAL_PACKET_LENGTH;
packet.limit(packet.position() + bufLen);
if (isSslHandshaking) {
- writeBuffer(packet, true);
+ writeBuffer(packet);
} else {
writeHeader(bufLen, isSslMode);
- writeBuffer(packet, isSslMode);
+ writeBuffer(packet);
accSequenceId();
}
}
if (isSslHandshaking) {
packet.limit(oldLimit);
- writeBuffer(packet, true);
+ writeBuffer(packet);
} else {
writeHeader(oldLimit - packet.position(), isSslMode);
packet.limit(oldLimit);
- writeBuffer(packet, isSslMode);
+ writeBuffer(packet);
accSequenceId();
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlChannelTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlChannelTest.java
new file mode 100644
index 00000000000..78d18b58bad
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlChannelTest.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mysql;
+
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.qe.ConnectContext;
+
+import mockit.Delegate;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Test;
+import org.xnio.StreamConnection;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class MysqlChannelTest {
+
+ @Mocked
+ StreamConnection streamConnection;
+
+ @Test
+ public void testSendAfterException() throws IOException {
+ // Mock.
+ new Expectations() {
+ {
+ streamConnection.getSinkChannel().write((ByteBuffer) any);
+ // The first call to `write()` throws IOException.
+ result = new IOException();
+ // The second call to `write()` executes normally.
+ result = new Delegate() {
+ int fakeRead(ByteBuffer buffer) {
+ int writeLen = buffer.remaining();
+ buffer.position(buffer.limit());
+ return writeLen;
+ }
+ };
+
+ streamConnection.getSinkChannel().flush();
+ result = true;
+ }
+ };
+
+ ConnectContext ctx = new ConnectContext(streamConnection);
+ MysqlChannel mysqlChannel = new MysqlChannel(streamConnection, ctx);
+ Deencapsulation.setField(mysqlChannel, "sendBuffer",
ByteBuffer.allocate(5));
+ // The first call to `realNetSend()` in `flush()` throws IOException.
+ // If `flush()` doesn't consider this exception, `sendBuffer` won't be
reset to write mode,
+ // which will cause BufferOverflowException at the next calling
`sendOnePacket()`.
+ ByteBuffer buf = ByteBuffer.allocate(12);
+ buf.putInt(1);
+ buf.putInt(2);
+ // limit=8
+ buf.flip();
+ try {
+ mysqlChannel.sendOnePacket(buf);
+ Assert.fail();
+ } catch (IOException ignore) {
+ // do nothing
+ }
+ buf.clear();
+
+ buf.putInt(1);
+ // limit=4
+ buf.flip();
+ mysqlChannel.sendOnePacket(buf);
+ buf.clear();
+
+ buf.putInt(1);
+ buf.putInt(2);
+ // limit=8
+ buf.flip();
+ mysqlChannel.sendOnePacket(buf);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]