This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 35f00f0395 Reject if response do not match any request (#11882)
35f00f0395 is described below
commit 35f00f0395fbd81ee708ea0e031da3cdad5ef2fb
Author: Albumen Kevin <[email protected]>
AuthorDate: Sat Mar 25 09:37:31 2023 +0800
Reject if response do not match any request (#11882)
---
.../remoting/exchange/codec/ExchangeCodec.java | 27 ++++++++++++++--------
.../dubbo/remoting/codec/ExchangeCodecTest.java | 17 ++++++++++++++
.../dubbo/rpc/protocol/dubbo/DubboCodec.java | 4 ++--
.../rpc/protocol/dubbo/DubboCountCodecTest.java | 13 ++++++++---
4 files changed, 46 insertions(+), 15 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index 44cd564f57..6fdbbea497 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -42,6 +42,8 @@ import
org.apache.dubbo.remoting.transport.ExceedPayloadLimitException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.Date;
/**
* ExchangeCodec.
@@ -167,7 +169,7 @@ public class ExchangeCodec extends TelnetCodec {
data = decodeEventData(channel,
CodecSupport.deserialize(channel.getUrl(), new
ByteArrayInputStream(eventPayload), proto), eventPayload);
}
} else {
- data = decodeResponseData(channel,
CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(id));
+ data = decodeResponseData(channel,
CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel,
res, id));
}
res.setResult(data);
} else {
@@ -209,16 +211,21 @@ public class ExchangeCodec extends TelnetCodec {
}
}
- protected Object getRequestData(long id) {
+ protected Object getRequestData(Channel channel, Response response, long
id) {
DefaultFuture future = DefaultFuture.getFuture(id);
- if (future == null) {
- return null;
- }
- Request req = future.getRequest();
- if (req == null) {
- return null;
+ if (future != null) {
+ Request req = future.getRequest();
+ if (req != null) {
+ return req.getData();
+ }
}
- return req.getData();
+
+ logger.warn("The timeout response finally returned at "
+ + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new
Date()))
+ + ", response status is " + response.getStatus() + ", response id
is " + response.getId()
+ + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ + " -> " + channel.getRemoteAddress()) + ", please check provider
side for detailed result.");
+ throw new IllegalArgumentException("Failed to find any request match
the response, response id: " + id);
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer,
Request req) throws IOException {
@@ -427,7 +434,7 @@ public class ExchangeCodec extends TelnetCodec {
try {
if (eventBytes != null) {
int dataLen = eventBytes.length;
- int threshold =
ConfigurationUtils.getSystemConfiguration(channel.getUrl().getScopeModel()).getInt("deserialization.event.size",
50);
+ int threshold =
ConfigurationUtils.getSystemConfiguration(channel.getUrl().getScopeModel()).getInt("deserialization.event.size",
15);
if (dataLen > threshold) {
throw new IllegalArgumentException("Event data too long,
actual size " + threshold + ", threshold " + threshold + " rejected for
security consideration.");
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
index 495092d8bb..54247f28af 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/codec/ExchangeCodecTest.java
@@ -30,11 +30,13 @@ import org.apache.dubbo.remoting.buffer.ChannelBuffers;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import org.apache.dubbo.remoting.telnet.codec.TelnetCodec;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -137,6 +139,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
@Test
public void test_Decode_Error_Length() throws IOException {
+ DefaultFuture future =
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000,
null);
+
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 0x02, 20, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
byte[] request = getRequestBytes(person, header);
@@ -148,6 +152,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
Assertions.assertEquals(person, obj.getResult());
//only decode necessary bytes
Assertions.assertEquals(request.length, buffer.readerIndex());
+
+ future.cancel();
}
@Test
@@ -226,6 +232,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
@Test
public void test_Decode_Return_Response_Person() throws IOException {
+ DefaultFuture future =
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000,
null);
//00000010-response/oneway/hearbeat=false/hessian
|20-stats=ok|id=0|length=0
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 2, 20, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
@@ -235,6 +242,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
Assertions.assertEquals(20, obj.getStatus());
Assertions.assertEquals(person, obj.getResult());
System.out.println(obj);
+
+ future.cancel();
}
@Test //The status input has a problem, and the read information is wrong
when the serialization is serialized.
@@ -324,6 +333,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
@Test
public void test_Header_Response_NoSerializationFlag() throws IOException {
+ DefaultFuture future =
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000,
null);
//00000010-response/oneway/hearbeat=false/noset
|20-stats=ok|id=0|length=0
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte) 0x02, 20, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
@@ -333,10 +343,13 @@ public class ExchangeCodecTest extends TelnetCodecTest {
Assertions.assertEquals(20, obj.getStatus());
Assertions.assertEquals(person, obj.getResult());
System.out.println(obj);
+
+ future.cancel();
}
@Test
public void test_Header_Response_Heartbeat() throws IOException {
+ DefaultFuture future =
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(0), 100000,
null);
//00000010-response/oneway/hearbeat=true |20-stats=ok|id=0|length=0
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 0x02, 20, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0};
Person person = new Person();
@@ -346,6 +359,8 @@ public class ExchangeCodecTest extends TelnetCodecTest {
Assertions.assertEquals(20, obj.getStatus());
Assertions.assertEquals(person, obj.getResult());
System.out.println(obj);
+
+ future.cancel();
}
@Test
@@ -371,6 +386,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
@Test
public void test_Encode_Response() throws IOException {
+ DefaultFuture future =
DefaultFuture.newFuture(Mockito.mock(Channel.class), new Request(1001), 100000,
null);
ChannelBuffer encodeBuffer = ChannelBuffers.dynamicBuffer(1024);
Channel channel = getClientSideChannel(url);
Response response = new Response();
@@ -396,6 +412,7 @@ public class ExchangeCodecTest extends TelnetCodecTest {
// encode response verson ??
// Assertions.assertEquals(response.getProtocolVersion(),
obj.getVersion());
+ future.cancel();
}
@Test
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 913138dc66..652a5f9e42 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -101,12 +101,12 @@ public class DubboCodec extends ExchangeCodec {
DecodeableRpcResult result;
if
(channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY,
DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
- (Invocation) getRequestData(id), proto);
+ (Invocation) getRequestData(channel, res,
id), proto);
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new
UnsafeByteArrayInputStream(readMessageData(is)),
- (Invocation) getRequestData(id), proto);
+ (Invocation) getRequestData(channel, res,
id), proto);
}
data = result;
}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodecTest.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodecTest.java
index b531bb1aa0..212efd5fd4 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodecTest.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCountCodecTest.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.remoting.buffer.ChannelBuffer;
import org.apache.dubbo.remoting.buffer.ChannelBuffers;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
import org.apache.dubbo.remoting.exchange.support.MultiMessage;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.RpcInvocation;
@@ -32,7 +33,9 @@ import
org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import static org.apache.dubbo.rpc.Constants.INPUT_KEY;
import static org.apache.dubbo.rpc.Constants.OUTPUT_KEY;
@@ -45,23 +48,25 @@ public class DubboCountCodecTest {
ChannelBuffer buffer = ChannelBuffers.buffer(1024);
Channel channel = new MockChannel();
Assertions.assertEquals(Codec2.DecodeResult.NEED_MORE_INPUT,
dubboCountCodec.decode(channel, buffer));
+ List<DefaultFuture> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- Request request = new Request(1);
+ Request request = new Request(i);
+ futures.add(DefaultFuture.newFuture(channel, request, 1000, null));
RpcInvocation rpcInvocation = new RpcInvocation(null, "echo",
DemoService.class.getName(), "", new Class<?>[]{String.class}, new
String[]{"yug"});
request.setData(rpcInvocation);
dubboCountCodec.encode(channel, buffer, request);
}
for (int i = 0; i < 10; i++) {
- Response response = new Response(1);
+ Response response = new Response(i);
AppResponse appResponse = new AppResponse(i);
response.setResult(appResponse);
dubboCountCodec.encode(channel, buffer, response);
}
MultiMessage multiMessage = (MultiMessage)
dubboCountCodec.decode(channel, buffer);
- Assertions.assertEquals(multiMessage.size(), 20);
+ Assertions.assertEquals(20, multiMessage.size());
int requestCount = 0;
int responseCount = 0;
Iterator iterator = multiMessage.iterator();
@@ -79,6 +84,8 @@ public class DubboCountCodecTest {
}
Assertions.assertEquals(requestCount, 10);
Assertions.assertEquals(responseCount, 10);
+
+ futures.forEach(DefaultFuture::cancel);
}
}