This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 32ada2487d Fix heartbeat failed if serialization changed (#11512)
32ada2487d is described below
commit 32ada2487d570e675a1811e7a4bd9332373bfe6e
Author: Albumen Kevin <[email protected]>
AuthorDate: Mon Feb 27 20:31:00 2023 +0800
Fix heartbeat failed if serialization changed (#11512)
fixes #11268
---
.../dubbo/remoting/exchange/HeartBeatRequest.java | 33 ++++++++++++++++++++++
.../dubbo/remoting/exchange/HeartBeatResponse.java | 33 ++++++++++++++++++++++
.../remoting/exchange/codec/ExchangeCodec.java | 18 +++++++-----
.../exchange/support/header/HeartbeatHandler.java | 10 +++++--
.../dubbo/rpc/protocol/dubbo/DubboCodec.java | 22 ++++++++++-----
5 files changed, 99 insertions(+), 17 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatRequest.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatRequest.java
new file mode 100644
index 0000000000..3cacbb9577
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatRequest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.exchange;
+
+public class HeartBeatRequest extends Request {
+ private byte proto;
+
+ public HeartBeatRequest(long id) {
+ super(id);
+ }
+
+ public byte getProto() {
+ return proto;
+ }
+
+ public void setProto(byte proto) {
+ this.proto = proto;
+ }
+}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatResponse.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatResponse.java
new file mode 100644
index 0000000000..35d4477c49
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatResponse.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.exchange;
+
+public class HeartBeatResponse extends Response{
+ private byte proto;
+
+ public HeartBeatResponse(long id, String version) {
+ super(id, version);
+ }
+
+ public byte getProto() {
+ return proto;
+ }
+
+ public void setProto(byte proto) {
+ this.proto = proto;
+ }
+}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index d631030637..9937c573fb 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -32,6 +32,7 @@ import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;
import org.apache.dubbo.remoting.buffer.ChannelBufferInputStream;
import org.apache.dubbo.remoting.buffer.ChannelBufferOutputStream;
+import org.apache.dubbo.remoting.exchange.HeartBeatRequest;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
@@ -184,31 +185,34 @@ public class ExchangeCodec extends TelnetCodec {
return res;
} else {
// decode request.
- Request req = new Request(id);
- req.setVersion(Version.getProtocolVersion());
- req.setTwoWay((flag & FLAG_TWOWAY) != 0);
- if ((flag & FLAG_EVENT) != 0) {
- req.setEvent(true);
- }
+ Request req;
try {
Object data;
- if (req.isEvent()) {
+ if ((flag & FLAG_EVENT) != 0) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
+ req = new HeartBeatRequest(id);
+ ((HeartBeatRequest) req).setProto(proto);
data = null;
} else {
+ req = new Request(id);
data = decodeEventData(channel,
CodecSupport.deserialize(channel.getUrl(), new
ByteArrayInputStream(eventPayload), proto), eventPayload);
}
+ req.setEvent(true);
} else {
+ req = new Request(id);
data = decodeRequestData(channel,
CodecSupport.deserialize(channel.getUrl(), is, proto));
}
req.setData(data);
} catch (Throwable t) {
// bad request
+ req = new Request(id);
req.setBroken(true);
req.setData(t);
}
+ req.setVersion(Version.getProtocolVersion());
+ req.setTwoWay((flag & FLAG_TWOWAY) != 0);
return req;
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
index 38a10e34cf..f1bb03cbef 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
@@ -23,6 +23,8 @@ import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.HeartBeatRequest;
+import org.apache.dubbo.remoting.exchange.HeartBeatResponse;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.AbstractChannelHandlerDelegate;
@@ -65,10 +67,12 @@ public class HeartbeatHandler extends
AbstractChannelHandlerDelegate {
public void received(Channel channel, Object message) throws
RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {
- Request req = (Request) message;
+ HeartBeatRequest req = (HeartBeatRequest) message;
if (req.isTwoWay()) {
- Response res = new Response(req.getId(), req.getVersion());
+ HeartBeatResponse res;
+ res = new HeartBeatResponse(req.getId(), req.getVersion());
res.setEvent(HEARTBEAT_EVENT);
+ res.setProto(req.getProto());
channel.send(res);
if (logger.isDebugEnabled()) {
int heartbeat =
channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
@@ -105,7 +109,7 @@ public class HeartbeatHandler extends
AbstractChannelHandlerDelegate {
}
private boolean isHeartbeatRequest(Object message) {
- return message instanceof Request && ((Request) message).isHeartbeat();
+ return message instanceof HeartBeatRequest && ((Request)
message).isHeartbeat();
}
private boolean isHeartbeatResponse(Object message) {
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 28b1453588..88a0f5622d 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -27,6 +27,8 @@ import org.apache.dubbo.common.serialize.Serialization;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.exchange.HeartBeatRequest;
+import org.apache.dubbo.remoting.exchange.HeartBeatResponse;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
@@ -131,24 +133,24 @@ public class DubboCodec extends ExchangeCodec {
return res;
} else {
// decode request.
- Request req = new Request(id);
- req.setVersion(Version.getProtocolVersion());
- req.setTwoWay((flag & FLAG_TWOWAY) != 0);
- if ((flag & FLAG_EVENT) != 0) {
- req.setEvent(true);
- }
+ Request req;
try {
Object data;
- if (req.isEvent()) {
+ if ((flag & FLAG_EVENT) != 0) {
byte[] eventPayload = CodecSupport.getPayload(is);
if (CodecSupport.isHeartBeat(eventPayload, proto)) {
// heart beat response data is always null;
+ req = new HeartBeatRequest(id);
+ ((HeartBeatRequest) req).setProto(proto);
data = null;
} else {
+ req = new Request(id);
ObjectInput in =
CodecSupport.deserialize(channel.getUrl(), new
ByteArrayInputStream(eventPayload), proto);
data = decodeEventData(channel, in, eventPayload);
}
+ req.setEvent(true);
} else {
+ req = new HeartBeatRequest(id);
DecodeableRpcInvocation inv;
if (isDecodeDataInIoThread(channel)) {
inv = new DecodeableRpcInvocation(frameworkModel,
channel, req, is, proto);
@@ -165,9 +167,12 @@ public class DubboCodec extends ExchangeCodec {
log.warn(PROTOCOL_FAILED_DECODE, "", "", "Decode request
failed: " + t.getMessage(), t);
}
// bad request
+ req = new HeartBeatRequest(id);
req.setBroken(true);
req.setData(t);
}
+ req.setVersion(Version.getProtocolVersion());
+ req.setTwoWay((flag & FLAG_TWOWAY) != 0);
return req;
}
@@ -267,6 +272,9 @@ public class DubboCodec extends ExchangeCodec {
@Override
protected Serialization getSerialization(Channel channel, Response res) {
+ if (res instanceof HeartBeatResponse) {
+ return CodecSupport.getSerializationById(((HeartBeatResponse)
res).getProto());
+ }
if (!(res.getResult() instanceof AppResponse)) {
return super.getSerialization(channel, res);
}