This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 8700256bf2 Unify graceful shutdown interface,add Triple support
(#15926)
8700256bf2 is described below
commit 8700256bf25e2b313fe2c924b8124a94ea3fd175
Author: earthchen <[email protected]>
AuthorDate: Tue Jan 6 10:52:14 2026 +0800
Unify graceful shutdown interface,add Triple support (#15926)
* Unify the graceful shutdown interface, remove the graceful shutdown logic
from the shutdown hook, and rely on performing graceful shutdown when closing
connections.
* fix mvn spotless:apply
* fix compile
* revert hook logic
* fix
* fix checkstyle
* docs: enhance JavaDoc for graceful shutdown components
Add comprehensive JavaDoc documentation and unit tests for the unified
graceful shutdown interface changes:
- ChannelEvent: add protocol handling table, @see tags
- ReadOnlyEvent/WriteableEvent: add protocol behavior descriptions
- AbstractGracefulShutdown: add architecture diagram, implementation guide
- DefaultProtocolServer: add class-level documentation
- GracefulShutdown: add RFC 7540 reference, method documentation
- TripleServerConnectionHandler: add userEventTriggered documentation
- DubboGracefulShutdown/TripleGracefulShutdown: enhance method docs
Add unit tests (25 test cases):
- DefaultProtocolServerTest (11 tests)
- AbstractGracefulShutdownTest (6 tests)
- DubboGracefulShutdownTest (7 tests)
- TripleGracefulShutdownTest (7 tests)
---
.../org/apache/dubbo/remoting/ChannelEvent.java | 48 ++++++
.../org/apache/dubbo/remoting/RemotingServer.java | 9 ++
.../apache/dubbo/remoting/event/ReadOnlyEvent.java | 58 +++++++
.../dubbo/remoting/event/WriteableEvent.java | 58 +++++++
.../exchange/support/ExchangeServerDelegate.java | 6 +
.../support/header/HeaderExchangeServer.java | 22 ++-
.../dubbo/remoting/transport/AbstractServer.java | 13 ++
.../dubbo/remoting/transport/ServerDelegate.java | 6 +
.../transport/netty4/NettyConnectionHandler.java | 2 +
.../netty4/NettyPortUnificationServer.java | 40 +++++
.../remoting/transport/netty4/NettyServer.java | 39 +++++
.../apache/dubbo/rpc/AbstractGracefulShutdown.java | 105 ++++++++++++
.../apache/dubbo/rpc/DefaultProtocolServer.java} | 44 +++++-
.../dubbo/rpc/protocol/AbstractProtocol.java | 3 +-
.../dubbo/rpc/protocol/AbstractProxyProtocol.java | 6 +
.../dubbo/rpc/AbstractGracefulShutdownTest.java | 176 +++++++++++++++++++++
.../dubbo/rpc/DefaultProtocolServerTest.java | 171 ++++++++++++++++++++
.../rpc/protocol/dubbo/DubboGracefulShutdown.java | 101 ++++++------
.../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 3 +-
.../protocol/dubbo/DubboGracefulShutdownTest.java | 168 ++++++++++++++++++++
.../rpc/protocol/tri/TripleGracefulShutdown.java | 93 +++++++++++
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 9 +-
.../dubbo/rpc/protocol/tri/TripleProtocol.java | 11 +-
.../protocol/tri/transport/GracefulShutdown.java | 116 +++++++++++++-
.../transport/TripleServerConnectionHandler.java | 28 ++++
.../protocol/tri/TripleGracefulShutdownTest.java | 168 ++++++++++++++++++++
26 files changed, 1442 insertions(+), 61 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/ChannelEvent.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/ChannelEvent.java
new file mode 100644
index 0000000000..944fd1a694
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/ChannelEvent.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting;
+
+/**
+ * Channel event that can be fired to channels.
+ * <p>
+ * This interface represents a custom event that can be fired to all connected
channels
+ * through the {@link RemotingServer#fireChannelEvent(ChannelEvent)} method.
+ * Different protocols can interpret and handle these events in their own way,
+ * providing a generic mechanism for sending custom events across protocols.
+ * </p>
+ *
+ * <h3>Built-in Events</h3>
+ * <ul>
+ * <li>{@link org.apache.dubbo.remoting.event.ReadOnlyEvent} - Notifies
clients that the server is entering
+ * read-only mode (typically for graceful shutdown)</li>
+ * <li>{@link org.apache.dubbo.remoting.event.WriteableEvent} - Notifies
clients that the server is resuming
+ * normal operation (cancelling graceful shutdown)</li>
+ * </ul>
+ *
+ * <h3>Protocol-specific Handling</h3>
+ * <table border="1">
+ * <tr><th>Protocol</th><th>ReadOnlyEvent</th><th>WriteableEvent</th></tr>
+ * <tr><td>Dubbo</td><td>Sends READONLY_EVENT request</td><td>Sends
WRITEABLE_EVENT request</td></tr>
+ * <tr><td>Triple (HTTP/2)</td><td>Sends GOAWAY frame</td><td>Not supported
(GOAWAY is irreversible)</td></tr>
+ * </table>
+ *
+ * @see RemotingServer#fireChannelEvent(ChannelEvent)
+ * @see org.apache.dubbo.remoting.event.ReadOnlyEvent
+ * @see org.apache.dubbo.remoting.event.WriteableEvent
+ * @since 3.3
+ */
+public interface ChannelEvent {}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/RemotingServer.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/RemotingServer.java
index 29fe60ace2..44c47ed09e 100755
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/RemotingServer.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/RemotingServer.java
@@ -44,6 +44,15 @@ public interface RemotingServer extends Endpoint, Resetable,
IdleSensible {
*/
Collection<Channel> getChannels();
+ /**
+ * Fire a custom event to all connected channels.
+ * <p>
+ * Different protocols can interpret and handle these events in their own
way.
+ *
+ * @param event the event to fire
+ */
+ void fireChannelEvent(ChannelEvent event);
+
/**
* get channel.
*
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/event/ReadOnlyEvent.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/event/ReadOnlyEvent.java
new file mode 100644
index 0000000000..f618a83ee2
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/event/ReadOnlyEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.event;
+
+import org.apache.dubbo.remoting.ChannelEvent;
+
+/**
+ * Read-only event for graceful shutdown.
+ * <p>
+ * This event indicates that the server is entering read-only mode and will
not accept
+ * new requests. It is typically fired during graceful shutdown to notify
connected clients
+ * that they should switch to other available providers.
+ * </p>
+ *
+ * <h3>Protocol-specific Behavior</h3>
+ * <p>When this event is fired to a channel:</p>
+ * <ul>
+ * <li><b>Dubbo protocol:</b> Sends a READONLY_EVENT request to the client.
+ * The client will mark this provider as unavailable and prefer other
providers for new requests.</li>
+ * <li><b>Triple protocol (HTTP/2):</b> Sends an HTTP/2 GOAWAY frame with
NO_ERROR code.
+ * The GOAWAY frame indicates that the server will not accept new
streams (requests)
+ * but existing streams can continue until completion.</li>
+ * </ul>
+ *
+ * <h3>Usage</h3>
+ * <p>This is a singleton class. Use {@link #INSTANCE} to get the instance.</p>
+ *
+ * @see org.apache.dubbo.remoting.RemotingServer#fireChannelEvent(ChannelEvent)
+ * @see WriteableEvent
+ * @see org.apache.dubbo.rpc.GracefulShutdown#readonly()
+ * @since 3.3
+ */
+public class ReadOnlyEvent implements ChannelEvent {
+
+ /**
+ * The singleton instance of ReadOnlyEvent.
+ */
+ public static final ReadOnlyEvent INSTANCE = new ReadOnlyEvent();
+
+ /**
+ * Private constructor to enforce singleton pattern.
+ */
+ private ReadOnlyEvent() {}
+}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/event/WriteableEvent.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/event/WriteableEvent.java
new file mode 100644
index 0000000000..69202002c4
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/event/WriteableEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.event;
+
+import org.apache.dubbo.remoting.ChannelEvent;
+
+/**
+ * Writeable event to resume normal operation after graceful shutdown is
cancelled.
+ * <p>
+ * This event indicates that the server is resuming normal operation and can
accept
+ * new requests again. It is typically fired when a graceful shutdown is
cancelled
+ * before completion.
+ * </p>
+ *
+ * <h3>Protocol-specific Behavior</h3>
+ * <p>When this event is fired to a channel:</p>
+ * <ul>
+ * <li><b>Dubbo protocol:</b> Sends a WRITEABLE_EVENT request to the client.
+ * The client will mark this provider as available again and can use it
for new requests.</li>
+ * <li><b>Triple protocol (HTTP/2):</b> <em>Not supported.</em> HTTP/2
GOAWAY frame is a one-way
+ * notification that cannot be reversed. Once a GOAWAY frame is sent,
the connection
+ * is in graceful shutdown mode and cannot be resumed.</li>
+ * </ul>
+ *
+ * <h3>Usage</h3>
+ * <p>This is a singleton class. Use {@link #INSTANCE} to get the instance.</p>
+ *
+ * @see org.apache.dubbo.remoting.RemotingServer#fireChannelEvent(ChannelEvent)
+ * @see ReadOnlyEvent
+ * @see org.apache.dubbo.rpc.GracefulShutdown#writeable()
+ * @since 3.3
+ */
+public class WriteableEvent implements ChannelEvent {
+
+ /**
+ * The singleton instance of WriteableEvent.
+ */
+ public static final WriteableEvent INSTANCE = new WriteableEvent();
+
+ /**
+ * Private constructor to enforce singleton pattern.
+ */
+ private WriteableEvent() {}
+}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/ExchangeServerDelegate.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/ExchangeServerDelegate.java
index 6a35949f5b..8a3f7d072e 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/ExchangeServerDelegate.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/ExchangeServerDelegate.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.remoting.exchange.support;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelEvent;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
@@ -65,6 +66,11 @@ public class ExchangeServerDelegate implements
ExchangeServer {
return server.getChannels();
}
+ @Override
+ public void fireChannelEvent(ChannelEvent event) {
+ server.fireChannelEvent(event);
+ }
+
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
return server.getChannel(remoteAddress);
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
index 2204076aba..0687a2a30c 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
@@ -26,10 +26,13 @@ import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelEvent;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.event.ReadOnlyEvent;
+import org.apache.dubbo.remoting.event.WriteableEvent;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Request;
@@ -43,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.unmodifiableCollection;
import static org.apache.dubbo.common.constants.CommonConstants.READONLY_EVENT;
+import static
org.apache.dubbo.common.constants.CommonConstants.WRITEABLE_EVENT;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.INTERNAL_ERROR;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RESPONSE;
@@ -108,7 +112,7 @@ public class HeaderExchangeServer implements ExchangeServer
{
if (timeout > 0) {
final long start = System.currentTimeMillis();
if
(getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
- sendChannelReadOnlyEvent();
+ sendChannelEvent(READONLY_EVENT);
}
while (HeaderExchangeServer.this.isRunning() &&
System.currentTimeMillis() - start < (long) timeout) {
try {
@@ -127,9 +131,9 @@ public class HeaderExchangeServer implements ExchangeServer
{
server.startClose();
}
- private void sendChannelReadOnlyEvent() {
+ private void sendChannelEvent(String event) {
Request request = new Request();
- request.setEvent(READONLY_EVENT);
+ request.setEvent(event);
request.setTwoWay(false);
request.setVersion(Version.getProtocolVersion());
@@ -271,4 +275,16 @@ public class HeaderExchangeServer implements
ExchangeServer {
this.closeTimer = new CloseTimerTask(cp, IDLE_CHECK_TIMER.get(),
closeTimeoutTick, closeTimeout);
}
}
+
+ @Override
+ public void fireChannelEvent(ChannelEvent event) {
+ if (event instanceof ReadOnlyEvent) {
+ sendChannelEvent(READONLY_EVENT);
+ } else if (event instanceof WriteableEvent) {
+ sendChannelEvent(WRITEABLE_EVENT);
+ } else {
+ // For other events, delegate to the underlying server
+ server.fireChannelEvent(event);
+ }
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
index 5b154156b3..7fe78f054b 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelEvent;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
@@ -198,4 +199,16 @@ public abstract class AbstractServer extends
AbstractEndpoint implements Remotin
}
super.disconnected(ch);
}
+
+ @Override
+ public void fireChannelEvent(ChannelEvent event) {
+ // Default implementation does nothing.
+ // Subclasses can override this method to implement protocol-specific
event handling.
+ logger.warn(
+ INTERNAL_ERROR,
+ "unknown error in remoting module",
+ "",
+ "The fireChannelEvent method is not implemented for "
+ + getClass().getName() + ", event: " + event);
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/ServerDelegate.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/ServerDelegate.java
index 76477df145..07eca24f8e 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/ServerDelegate.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/ServerDelegate.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.remoting.transport;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelEvent;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.RemotingServer;
@@ -118,4 +119,9 @@ public class ServerDelegate implements RemotingServer {
public boolean isClosed() {
return server.isClosed();
}
+
+ @Override
+ public void fireChannelEvent(ChannelEvent event) {
+ server.fireChannelEvent(event);
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java
index 16b14a0a81..2e17e75488 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.api.connection.ConnectionHandler;
import java.util.concurrent.TimeUnit;
@@ -57,6 +58,7 @@ public class NettyConnectionHandler extends
ChannelInboundHandlerAdapter impleme
attr.set(true);
if (connectionClient != null) {
+
connectionClient.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, true);
connectionClient.onGoaway(nettyChannel);
}
if (LOGGER.isDebugEnabled()) {
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
index 8bbdfb188f..9c29379b30 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelEvent;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
@@ -234,4 +235,43 @@ public class NettyPortUnificationServer extends
AbstractPortUnificationServer {
public boolean canHandleIdle() {
return true;
}
+
+ @Override
+ public void fireChannelEvent(ChannelEvent event) {
+ Collection<Channel> channels = getChannels();
+ if (CollectionUtils.isEmpty(channels)) {
+ return;
+ }
+ for (Channel channel : channels) {
+ try {
+ if (channel.isConnected()) {
+ fireChannelEventToChannel(channel, event);
+ }
+ } catch (Throwable e) {
+ logger.warn(
+ TRANSPORT_FAILED_CLOSE,
+ "",
+ "",
+ "Failed to fire channel event to channel: " + channel
+ ", event: " + event,
+ e);
+ }
+ }
+ }
+
+ /**
+ * Fire ChannelEvent to the channel.
+ * The event will be handled by protocol-specific handlers (e.g.,
TripleServerConnectionHandler).
+ *
+ * @param channel the Dubbo channel
+ * @param event the channel event to fire
+ */
+ private void fireChannelEventToChannel(Channel channel, ChannelEvent
event) {
+ // Get the underlying netty channel and fire the user event
+ if (channel instanceof NettyChannel) {
+ io.netty.channel.Channel nettyChannel = ((NettyChannel)
channel).getNioChannel();
+ if (nettyChannel != null && nettyChannel.isActive()) {
+ nettyChannel.pipeline().fireUserEventTriggered(event);
+ }
+ }
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
index b5a7b29c65..69af2ffc5c 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
@@ -25,6 +25,7 @@ import org.apache.dubbo.metrics.event.MetricsEventBus;
import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.registry.event.NettyEvent;
import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelEvent;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
@@ -282,4 +283,42 @@ public class NettyServer extends AbstractServer {
protected Map<String, Channel> getServerChannels() {
return channels;
}
+
+ @Override
+ public void fireChannelEvent(ChannelEvent event) {
+ Collection<Channel> channels = getChannels();
+ if (CollectionUtils.isEmpty(channels)) {
+ return;
+ }
+ for (Channel channel : channels) {
+ try {
+ if (channel.isConnected()) {
+ fireChannelEventToChannel(channel, event);
+ }
+ } catch (Throwable e) {
+ logger.warn(
+ TRANSPORT_FAILED_CLOSE,
+ "",
+ "",
+ "Failed to fire channel event to channel: " + channel
+ ", event: " + event,
+ e);
+ }
+ }
+ }
+
+ /**
+ * Fire ChannelEvent to the channel.
+ * The event will be handled by protocol-specific handlers.
+ *
+ * @param channel the Dubbo channel
+ * @param event the channel event to fire
+ */
+ private void fireChannelEventToChannel(Channel channel, ChannelEvent
event) {
+ if (channel instanceof NettyChannel) {
+ io.netty.channel.Channel nettyChannel = ((NettyChannel)
channel).getNioChannel();
+ if (nettyChannel != null && nettyChannel.isActive()) {
+ nettyChannel.pipeline().fireUserEventTriggered(event);
+ }
+ }
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AbstractGracefulShutdown.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AbstractGracefulShutdown.java
new file mode 100644
index 0000000000..d04439be15
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AbstractGracefulShutdown.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.ChannelEvent;
+
+import java.util.Collection;
+
+import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE_STREAM;
+
+/**
+ * Abstract base class for graceful shutdown implementations.
+ * <p>
+ * This class provides common functionality for graceful shutdown across
different protocols.
+ * It implements the {@link GracefulShutdown} interface and provides a
template for
+ * protocol-specific implementations.
+ * </p>
+ *
+ * <h3>Architecture</h3>
+ * <p>The graceful shutdown mechanism works as follows:</p>
+ * <pre>
+ * GracefulShutdown
+ * │
+ * ▼
+ * AbstractGracefulShutdown
+ * / \
+ * / \
+ * DubboGracefulShutdown TripleGracefulShutdown
+ * │ │
+ * ▼ ▼
+ * READONLY_EVENT GOAWAY Frame
+ * </pre>
+ *
+ * <h3>Implementation Guide</h3>
+ * <p>Subclasses must implement:</p>
+ * <ul>
+ * <li>{@link #getServers()} - Return the collection of protocol servers to
notify</li>
+ * <li>{@link #readonly()} - Fire the appropriate event to enter read-only
mode</li>
+ * <li>{@link #writeable()} - Fire the appropriate event to resume normal
operation (if supported)</li>
+ * </ul>
+ *
+ * @see GracefulShutdown
+ * @see org.apache.dubbo.rpc.protocol.dubbo.DubboGracefulShutdown
+ * @see org.apache.dubbo.rpc.protocol.tri.TripleGracefulShutdown
+ * @since 3.3
+ */
+public abstract class AbstractGracefulShutdown implements GracefulShutdown {
+
+ /**
+ * Logger for graceful shutdown operations.
+ */
+ protected final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(getClass());
+
+ /**
+ * Get the collection of protocol servers that need to be notified during
graceful shutdown.
+ * <p>
+ * Subclasses should return all active servers for the specific protocol.
+ * </p>
+ *
+ * @return collection of protocol servers, never null
+ */
+ protected abstract Collection<ProtocolServer> getServers();
+
+ /**
+ * Fire a channel event to all servers and their connected channels.
+ * <p>
+ * This method iterates through all protocol servers returned by {@link
#getServers()}
+ * and fires the given event to each server's remoting server. The event
will be
+ * propagated to all connected channels.
+ * </p>
+ * <p>
+ * Exceptions during event firing are logged but not propagated, ensuring
that
+ * failures on individual channels don't affect other channels.
+ * </p>
+ *
+ * @param event the channel event to fire (e.g., {@link
org.apache.dubbo.remoting.event.ReadOnlyEvent}
+ * or {@link org.apache.dubbo.remoting.event.WriteableEvent})
+ */
+ protected void fireChannelEvent(ChannelEvent event) {
+ try {
+ for (ProtocolServer server : getServers()) {
+ server.getRemotingServer().fireChannelEvent(event);
+ }
+ } catch (Throwable e) {
+ logger.warn(
+ TRANSPORT_FAILED_CLOSE_STREAM, "", "", "Failed to fire
channel event during graceful shutdown.", e);
+ }
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocolServer.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/DefaultProtocolServer.java
similarity index 60%
rename from
dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocolServer.java
rename to
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/DefaultProtocolServer.java
index 46a5b4c55a..4d81c4be28 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocolServer.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/DefaultProtocolServer.java
@@ -14,23 +14,59 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.rpc.protocol.dubbo;
+package org.apache.dubbo.rpc;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.RemotingServer;
-import org.apache.dubbo.rpc.ProtocolServer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class DubboProtocolServer implements ProtocolServer {
+/**
+ * Default implementation of {@link ProtocolServer}.
+ * <p>
+ * This class wraps a {@link RemotingServer} and provides a simple
implementation
+ * of the {@link ProtocolServer} interface. It is used by protocols that don't
need
+ * special protocol server implementations, such as Triple protocol when using
+ * port unification.
+ * </p>
+ *
+ * <h3>Features</h3>
+ * <ul>
+ * <li>Wraps a {@link RemotingServer} instance</li>
+ * <li>Supports custom address override</li>
+ * <li>Provides thread-safe attribute storage</li>
+ * <li>Delegates all operations to the underlying remoting server</li>
+ * </ul>
+ *
+ * @see ProtocolServer
+ * @see RemotingServer
+ * @since 3.3
+ */
+public class DefaultProtocolServer implements ProtocolServer {
+ /**
+ * The underlying remoting server.
+ */
private final RemotingServer server;
+
+ /**
+ * Custom address override. If null, the server's URL address is used.
+ */
private String address;
+
+ /**
+ * Thread-safe storage for custom attributes.
+ */
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
- public DubboProtocolServer(RemotingServer server) {
+ /**
+ * Create a new DefaultProtocolServer wrapping the given remoting server.
+ *
+ * @param server the underlying remoting server, must not be null
+ */
+ public DefaultProtocolServer(RemotingServer server) {
this.server = server;
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
index 2fe17714f3..6eca9b8026 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
@@ -42,6 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
@@ -62,7 +63,7 @@ public abstract class AbstractProtocol implements Protocol,
ScopeModelAware {
/**
* <host:port, ProtocolServer>
*/
- protected final Map<String, ProtocolServer> serverMap = new
ConcurrentHashMap<>();
+ protected final ConcurrentMap<String, ProtocolServer> serverMap = new
ConcurrentHashMap<>();
// TODO SoftReference
protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<>();
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProxyProtocol.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProxyProtocol.java
index 6123f22ff2..1bd9301136 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProxyProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProxyProtocol.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.ChannelEvent;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
@@ -274,5 +275,10 @@ public abstract class AbstractProxyProtocol extends
AbstractProtocol {
public boolean isClosed() {
return false;
}
+
+ @Override
+ public void fireChannelEvent(ChannelEvent event) {
+ // Default implementation does nothing for proxy protocols
+ }
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/AbstractGracefulShutdownTest.java
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/AbstractGracefulShutdownTest.java
new file mode 100644
index 0000000000..c86e2263c0
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/AbstractGracefulShutdownTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.remoting.ChannelEvent;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.event.ReadOnlyEvent;
+import org.apache.dubbo.remoting.event.WriteableEvent;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link AbstractGracefulShutdown}.
+ */
+class AbstractGracefulShutdownTest {
+
+ private ProtocolServer mockServer1;
+ private ProtocolServer mockServer2;
+ private RemotingServer mockRemotingServer1;
+ private RemotingServer mockRemotingServer2;
+ private List<ChannelEvent> capturedEvents;
+
+ @BeforeEach
+ void setUp() {
+ mockServer1 = Mockito.mock(ProtocolServer.class);
+ mockServer2 = Mockito.mock(ProtocolServer.class);
+ mockRemotingServer1 = Mockito.mock(RemotingServer.class);
+ mockRemotingServer2 = Mockito.mock(RemotingServer.class);
+
+ when(mockServer1.getRemotingServer()).thenReturn(mockRemotingServer1);
+ when(mockServer2.getRemotingServer()).thenReturn(mockRemotingServer2);
+
+ capturedEvents = new ArrayList<>();
+ }
+
+ /**
+ * Test fireChannelEvent fires event to single server.
+ */
+ @Test
+ void testFireChannelEventSingleServer() {
+ TestGracefulShutdown shutdown = new
TestGracefulShutdown(Collections.singletonList(mockServer1));
+
+ shutdown.readonly();
+
+ verify(mockRemotingServer1,
times(1)).fireChannelEvent(ReadOnlyEvent.INSTANCE);
+ }
+
+ /**
+ * Test fireChannelEvent fires event to multiple servers.
+ */
+ @Test
+ void testFireChannelEventMultipleServers() {
+ TestGracefulShutdown shutdown = new
TestGracefulShutdown(Arrays.asList(mockServer1, mockServer2));
+
+ shutdown.readonly();
+
+ verify(mockRemotingServer1,
times(1)).fireChannelEvent(ReadOnlyEvent.INSTANCE);
+ verify(mockRemotingServer2,
times(1)).fireChannelEvent(ReadOnlyEvent.INSTANCE);
+ }
+
+ /**
+ * Test readonly sends ReadOnlyEvent.
+ */
+ @Test
+ void testReadonlySendsReadOnlyEvent() {
+ TestGracefulShutdown shutdown = new
TestGracefulShutdown(Collections.singletonList(mockServer1));
+
+ shutdown.readonly();
+
+ ArgumentCaptor<ChannelEvent> captor =
ArgumentCaptor.forClass(ChannelEvent.class);
+ verify(mockRemotingServer1).fireChannelEvent(captor.capture());
+
+ ChannelEvent capturedEvent = captor.getValue();
+ assertTrue(capturedEvent instanceof ReadOnlyEvent);
+ assertSame(ReadOnlyEvent.INSTANCE, capturedEvent);
+ }
+
+ /**
+ * Test writeable sends WriteableEvent.
+ */
+ @Test
+ void testWriteableSendsWriteableEvent() {
+ TestGracefulShutdown shutdown = new
TestGracefulShutdown(Collections.singletonList(mockServer1));
+
+ shutdown.writeable();
+
+ ArgumentCaptor<ChannelEvent> captor =
ArgumentCaptor.forClass(ChannelEvent.class);
+ verify(mockRemotingServer1).fireChannelEvent(captor.capture());
+
+ ChannelEvent capturedEvent = captor.getValue();
+ assertTrue(capturedEvent instanceof WriteableEvent);
+ assertSame(WriteableEvent.INSTANCE, capturedEvent);
+ }
+
+ /**
+ * Test that exceptions are caught and don't propagate.
+ */
+ @Test
+ void testExceptionsAreCaught() {
+ Mockito.doThrow(new RuntimeException("Test exception"))
+ .when(mockRemotingServer1)
+ .fireChannelEvent(Mockito.any(ChannelEvent.class));
+
+ TestGracefulShutdown shutdown = new
TestGracefulShutdown(Collections.singletonList(mockServer1));
+
+ // Should not throw exception
+ shutdown.readonly();
+ }
+
+ /**
+ * Test with empty server list.
+ */
+ @Test
+ void testEmptyServerList() {
+ TestGracefulShutdown shutdown = new
TestGracefulShutdown(Collections.emptyList());
+
+ // Should not throw exception
+ shutdown.readonly();
+ shutdown.writeable();
+ }
+
+ /**
+ * Test implementation of AbstractGracefulShutdown for testing purposes.
+ */
+ private static class TestGracefulShutdown extends AbstractGracefulShutdown
{
+ private final Collection<ProtocolServer> servers;
+
+ TestGracefulShutdown(Collection<ProtocolServer> servers) {
+ this.servers = servers;
+ }
+
+ @Override
+ protected Collection<ProtocolServer> getServers() {
+ return servers;
+ }
+
+ @Override
+ public void readonly() {
+ fireChannelEvent(ReadOnlyEvent.INSTANCE);
+ }
+
+ @Override
+ public void writeable() {
+ fireChannelEvent(WriteableEvent.INSTANCE);
+ }
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/DefaultProtocolServerTest.java
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/DefaultProtocolServerTest.java
new file mode 100644
index 0000000000..a51a55aa51
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/DefaultProtocolServerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.url.component.ServiceConfigURL;
+import org.apache.dubbo.remoting.ChannelEvent;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.event.ReadOnlyEvent;
+
+import java.util.Map;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link DefaultProtocolServer}.
+ */
+class DefaultProtocolServerTest {
+
+ private RemotingServer mockServer;
+ private URL testUrl;
+ private DefaultProtocolServer protocolServer;
+
+ @BeforeEach
+ void setUp() {
+ mockServer = Mockito.mock(RemotingServer.class);
+ testUrl = new ServiceConfigURL("dubbo", "127.0.0.1", 20881);
+ when(mockServer.getUrl()).thenReturn(testUrl);
+ protocolServer = new DefaultProtocolServer(mockServer);
+ }
+
+ /**
+ * Test that getRemotingServer returns the underlying server.
+ */
+ @Test
+ void testGetRemotingServer() {
+ assertSame(mockServer, protocolServer.getRemotingServer());
+ }
+
+ /**
+ * Test that getUrl returns the server's URL.
+ */
+ @Test
+ void testGetUrl() {
+ assertEquals(testUrl, protocolServer.getUrl());
+ }
+
+ /**
+ * Test that getAddress returns the server's address when no custom
address is set.
+ */
+ @Test
+ void testGetAddressFromServer() {
+ assertEquals(testUrl.getAddress(), protocolServer.getAddress());
+ }
+
+ /**
+ * Test that getAddress returns the custom address when set.
+ */
+ @Test
+ void testGetAddressCustom() {
+ String customAddress = "192.168.1.100:8080";
+ protocolServer.setAddress(customAddress);
+ assertEquals(customAddress, protocolServer.getAddress());
+ }
+
+ /**
+ * Test that setAddress updates the address.
+ */
+ @Test
+ void testSetAddress() {
+ String address1 = "192.168.1.100:8080";
+ String address2 = "192.168.1.200:9090";
+
+ protocolServer.setAddress(address1);
+ assertEquals(address1, protocolServer.getAddress());
+
+ protocolServer.setAddress(address2);
+ assertEquals(address2, protocolServer.getAddress());
+ }
+
+ /**
+ * Test that reset delegates to the underlying server.
+ */
+ @Test
+ void testReset() {
+ URL newUrl = new ServiceConfigURL("dubbo", "127.0.0.1", 20882);
+ protocolServer.reset(newUrl);
+ verify(mockServer, times(1)).reset(newUrl);
+ }
+
+ /**
+ * Test that close delegates to the underlying server.
+ */
+ @Test
+ void testClose() {
+ protocolServer.close();
+ verify(mockServer, times(1)).close();
+ }
+
+ /**
+ * Test that getAttributes returns a non-null map.
+ */
+ @Test
+ void testGetAttributesNotNull() {
+ Map<String, Object> attributes = protocolServer.getAttributes();
+ assertNotNull(attributes);
+ }
+
+ /**
+ * Test that attributes can be stored and retrieved.
+ */
+ @Test
+ void testAttributesStorage() {
+ Map<String, Object> attributes = protocolServer.getAttributes();
+
+ attributes.put("key1", "value1");
+ attributes.put("key2", 42);
+ attributes.put("key3", true);
+
+ assertEquals("value1", attributes.get("key1"));
+ assertEquals(42, attributes.get("key2"));
+ assertEquals(true, attributes.get("key3"));
+ }
+
+ /**
+ * Test that attributes are thread-safe (ConcurrentHashMap).
+ */
+ @Test
+ void testAttributesAreSameInstance() {
+ Map<String, Object> attributes1 = protocolServer.getAttributes();
+ Map<String, Object> attributes2 = protocolServer.getAttributes();
+
+ assertSame(attributes1, attributes2);
+ }
+
+ /**
+ * Test fireChannelEvent with ReadOnlyEvent delegates to the underlying
server.
+ */
+ @Test
+ void testFireChannelEventDelegation() {
+ ChannelEvent event = ReadOnlyEvent.INSTANCE;
+
+ // Since DefaultProtocolServer doesn't override fireChannelEvent,
+ // we test through getRemotingServer().fireChannelEvent()
+ protocolServer.getRemotingServer().fireChannelEvent(event);
+ verify(mockServer, times(1)).fireChannelEvent(event);
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboGracefulShutdown.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboGracefulShutdown.java
index 707f71e1fe..a7c4f5c5a6 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboGracefulShutdown.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboGracefulShutdown.java
@@ -16,69 +16,76 @@
*/
package org.apache.dubbo.rpc.protocol.dubbo;
-import org.apache.dubbo.common.Version;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.remoting.Channel;
-import org.apache.dubbo.remoting.Constants;
-import org.apache.dubbo.remoting.RemotingException;
-import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.rpc.GracefulShutdown;
+import org.apache.dubbo.remoting.event.ReadOnlyEvent;
+import org.apache.dubbo.remoting.event.WriteableEvent;
+import org.apache.dubbo.rpc.AbstractGracefulShutdown;
import org.apache.dubbo.rpc.ProtocolServer;
-import java.nio.channels.ClosedChannelException;
import java.util.Collection;
-import static org.apache.dubbo.common.constants.CommonConstants.READONLY_EVENT;
-import static
org.apache.dubbo.common.constants.CommonConstants.WRITEABLE_EVENT;
-import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE_STREAM;
+/**
+ * Dubbo protocol graceful shutdown implementation.
+ * <p>
+ * For Dubbo protocol, graceful shutdown sends READONLY_EVENT to all connected
clients,
+ * telling them that the server is going to shut down and they should switch
to other providers.
+ * </p>
+ * <p>
+ * Dubbo protocol also supports WRITEABLE_EVENT to resume normal operation if
graceful shutdown
+ * is cancelled.
+ * </p>
+ */
+public class DubboGracefulShutdown extends AbstractGracefulShutdown {
-public class DubboGracefulShutdown implements GracefulShutdown {
- private static final ErrorTypeAwareLogger logger =
- LoggerFactory.getErrorTypeAwareLogger(DubboGracefulShutdown.class);
+ /**
+ * Reference to the Dubbo protocol instance for accessing servers.
+ */
private final DubboProtocol dubboProtocol;
+ /**
+ * Create a new DubboGracefulShutdown instance.
+ *
+ * @param dubboProtocol the Dubbo protocol instance, must not be null
+ */
public DubboGracefulShutdown(DubboProtocol dubboProtocol) {
this.dubboProtocol = dubboProtocol;
}
+ /**
+ * {@inheritDoc}
+ *
+ * @return all active Dubbo protocol servers
+ */
@Override
- public void readonly() {
- sendEvent(READONLY_EVENT);
+ protected Collection<ProtocolServer> getServers() {
+ return dubboProtocol.getServers();
}
+ /**
+ * Enter read-only mode by sending READONLY_EVENT to all connected clients.
+ * <p>
+ * For Dubbo protocol, this fires a {@link ReadOnlyEvent} which triggers
the sending
+ * of READONLY_EVENT requests to all connected clients. Clients receiving
this event
+ * will mark this provider as unavailable and prefer other providers for
new requests.
+ * </p>
+ */
@Override
- public void writeable() {
- sendEvent(WRITEABLE_EVENT);
+ public void readonly() {
+ fireChannelEvent(ReadOnlyEvent.INSTANCE);
}
- private void sendEvent(String event) {
- try {
- for (ProtocolServer server : dubboProtocol.getServers()) {
- Collection<Channel> channels =
server.getRemotingServer().getChannels();
- Request request = new Request();
- request.setEvent(event);
- request.setTwoWay(false);
- request.setVersion(Version.getProtocolVersion());
-
- for (Channel channel : channels) {
- try {
- if (channel.isConnected()) {
- channel.send(
- request,
-
channel.getUrl().getParameter(Constants.CHANNEL_READONLYEVENT_SENT_KEY, true));
- }
- } catch (RemotingException e) {
- if (e.getCause() instanceof ClosedChannelException) {
- // ignore ClosedChannelException which means the
connection has been closed.
- continue;
- }
- logger.warn(TRANSPORT_FAILED_CLOSE_STREAM, "", "",
"send cannot write message error.", e);
- }
- }
- }
- } catch (Throwable e) {
- logger.warn(TRANSPORT_FAILED_CLOSE_STREAM, "", "", "send cannot
write message error.", e);
- }
+ /**
+ * Resume normal operation by sending WRITEABLE_EVENT to all connected
clients.
+ * <p>
+ * For Dubbo protocol, this fires a {@link WriteableEvent} which triggers
the sending
+ * of WRITEABLE_EVENT requests to all connected clients. Clients receiving
this event
+ * will mark this provider as available again and can use it for new
requests.
+ * </p>
+ * <p>
+ * This is useful when a graceful shutdown is cancelled before completion.
+ * </p>
+ */
+ @Override
+ public void writeable() {
+ fireChannelEvent(WriteableEvent.INSTANCE);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 4a919adf25..fda55e5667 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -35,6 +35,7 @@ import org.apache.dubbo.remoting.exchange.Exchangers;
import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
import org.apache.dubbo.remoting.utils.UrlUtils;
+import org.apache.dubbo.rpc.DefaultProtocolServer;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
@@ -426,7 +427,7 @@ public class DubboProtocol extends AbstractProtocol {
throw new RpcException("Unsupported client type: " + transporter);
}
- DubboProtocolServer protocolServer = new DubboProtocolServer(server);
+ DefaultProtocolServer protocolServer = new
DefaultProtocolServer(server);
loadServerProperties(protocolServer);
return protocolServer;
}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboGracefulShutdownTest.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboGracefulShutdownTest.java
new file mode 100644
index 0000000000..b08d02ced9
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboGracefulShutdownTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo;
+
+import org.apache.dubbo.remoting.ChannelEvent;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.event.ReadOnlyEvent;
+import org.apache.dubbo.remoting.event.WriteableEvent;
+import org.apache.dubbo.rpc.GracefulShutdown;
+import org.apache.dubbo.rpc.ProtocolServer;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link DubboGracefulShutdown}.
+ */
+class DubboGracefulShutdownTest {
+
+ private DubboProtocol mockDubboProtocol;
+ private ProtocolServer mockServer1;
+ private ProtocolServer mockServer2;
+ private RemotingServer mockRemotingServer1;
+ private RemotingServer mockRemotingServer2;
+
+ @BeforeEach
+ void setUp() {
+ mockDubboProtocol = Mockito.mock(DubboProtocol.class);
+ mockServer1 = Mockito.mock(ProtocolServer.class);
+ mockServer2 = Mockito.mock(ProtocolServer.class);
+ mockRemotingServer1 = Mockito.mock(RemotingServer.class);
+ mockRemotingServer2 = Mockito.mock(RemotingServer.class);
+
+ when(mockServer1.getRemotingServer()).thenReturn(mockRemotingServer1);
+ when(mockServer2.getRemotingServer()).thenReturn(mockRemotingServer2);
+ }
+
+ /**
+ * Test that DubboGracefulShutdown implements GracefulShutdown.
+ */
+ @Test
+ void testImplementsGracefulShutdown() {
+ DubboGracefulShutdown shutdown = new
DubboGracefulShutdown(mockDubboProtocol);
+ assertTrue(shutdown instanceof GracefulShutdown);
+ }
+
+ /**
+ * Test readonly sends ReadOnlyEvent to all servers.
+ */
+ @Test
+ void testReadonlySendsReadOnlyEvent() {
+
when(mockDubboProtocol.getServers()).thenReturn(Collections.singletonList(mockServer1));
+
+ DubboGracefulShutdown shutdown = new
DubboGracefulShutdown(mockDubboProtocol);
+ shutdown.readonly();
+
+ ArgumentCaptor<ChannelEvent> captor =
ArgumentCaptor.forClass(ChannelEvent.class);
+ verify(mockRemotingServer1,
times(1)).fireChannelEvent(captor.capture());
+
+ ChannelEvent capturedEvent = captor.getValue();
+ assertTrue(capturedEvent instanceof ReadOnlyEvent);
+ assertSame(ReadOnlyEvent.INSTANCE, capturedEvent);
+ }
+
+ /**
+ * Test writeable sends WriteableEvent to all servers.
+ */
+ @Test
+ void testWriteableSendsWriteableEvent() {
+
when(mockDubboProtocol.getServers()).thenReturn(Collections.singletonList(mockServer1));
+
+ DubboGracefulShutdown shutdown = new
DubboGracefulShutdown(mockDubboProtocol);
+ shutdown.writeable();
+
+ ArgumentCaptor<ChannelEvent> captor =
ArgumentCaptor.forClass(ChannelEvent.class);
+ verify(mockRemotingServer1,
times(1)).fireChannelEvent(captor.capture());
+
+ ChannelEvent capturedEvent = captor.getValue();
+ assertTrue(capturedEvent instanceof WriteableEvent);
+ assertSame(WriteableEvent.INSTANCE, capturedEvent);
+ }
+
+ /**
+ * Test readonly sends event to multiple servers.
+ */
+ @Test
+ void testReadonlyMultipleServers() {
+ List<ProtocolServer> servers = Arrays.asList(mockServer1, mockServer2);
+ when(mockDubboProtocol.getServers()).thenReturn(servers);
+
+ DubboGracefulShutdown shutdown = new
DubboGracefulShutdown(mockDubboProtocol);
+ shutdown.readonly();
+
+ verify(mockRemotingServer1,
times(1)).fireChannelEvent(ReadOnlyEvent.INSTANCE);
+ verify(mockRemotingServer2,
times(1)).fireChannelEvent(ReadOnlyEvent.INSTANCE);
+ }
+
+ /**
+ * Test writeable sends event to multiple servers.
+ */
+ @Test
+ void testWriteableMultipleServers() {
+ List<ProtocolServer> servers = Arrays.asList(mockServer1, mockServer2);
+ when(mockDubboProtocol.getServers()).thenReturn(servers);
+
+ DubboGracefulShutdown shutdown = new
DubboGracefulShutdown(mockDubboProtocol);
+ shutdown.writeable();
+
+ verify(mockRemotingServer1,
times(1)).fireChannelEvent(WriteableEvent.INSTANCE);
+ verify(mockRemotingServer2,
times(1)).fireChannelEvent(WriteableEvent.INSTANCE);
+ }
+
+ /**
+ * Test with empty server list.
+ */
+ @Test
+ void testEmptyServerList() {
+
when(mockDubboProtocol.getServers()).thenReturn(Collections.emptyList());
+
+ DubboGracefulShutdown shutdown = new
DubboGracefulShutdown(mockDubboProtocol);
+
+ // Should not throw exception
+ shutdown.readonly();
+ shutdown.writeable();
+ }
+
+ /**
+ * Test that getServers returns the protocol's servers.
+ */
+ @Test
+ void testGetServersReturnsProtocolServers() {
+ List<ProtocolServer> expectedServers = Arrays.asList(mockServer1,
mockServer2);
+ when(mockDubboProtocol.getServers()).thenReturn(expectedServers);
+
+ DubboGracefulShutdown shutdown = new
DubboGracefulShutdown(mockDubboProtocol);
+
+ // Trigger readonly to indirectly verify getServers is called
+ shutdown.readonly();
+
+ verify(mockDubboProtocol, times(1)).getServers();
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleGracefulShutdown.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleGracefulShutdown.java
new file mode 100644
index 0000000000..b97c6175e2
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleGracefulShutdown.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.remoting.event.ReadOnlyEvent;
+import org.apache.dubbo.rpc.AbstractGracefulShutdown;
+import org.apache.dubbo.rpc.ProtocolServer;
+
+import java.util.Collection;
+
+/**
+ * Triple protocol graceful shutdown implementation.
+ * <p>
+ * For Triple protocol (HTTP/2), graceful shutdown sends GOAWAY frames to all
connected clients,
+ * telling them not to send new requests. Existing streams can continue until
completion.
+ * </p>
+ * <p>
+ * Triple protocol does not support writeable event because GOAWAY is a
one-way notification
+ * that cannot be reversed. Once a GOAWAY frame is sent, the connection is in
graceful shutdown mode.
+ * </p>
+ */
+public class TripleGracefulShutdown extends AbstractGracefulShutdown {
+
+ /**
+ * Reference to the Triple protocol instance for accessing servers.
+ */
+ private final TripleProtocol tripleProtocol;
+
+ /**
+ * Create a new TripleGracefulShutdown instance.
+ *
+ * @param tripleProtocol the Triple protocol instance, must not be null
+ */
+ public TripleGracefulShutdown(TripleProtocol tripleProtocol) {
+ this.tripleProtocol = tripleProtocol;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return all active Triple protocol servers
+ */
+ @Override
+ protected Collection<ProtocolServer> getServers() {
+ return tripleProtocol.getServers();
+ }
+
+ /**
+ * Enter read-only mode by sending GOAWAY frames to all connected clients.
+ * <p>
+ * For Triple protocol (HTTP/2), this fires a {@link ReadOnlyEvent} which
triggers
+ * the sending of GOAWAY frames. The GOAWAY frame with NO_ERROR code tells
clients
+ * that the server will not accept new streams but existing streams can
continue.
+ * </p>
+ */
+ @Override
+ public void readonly() {
+ fireChannelEvent(ReadOnlyEvent.INSTANCE);
+ }
+
+ /**
+ * Resume normal operation (not supported for Triple protocol).
+ * <p>
+ * Triple protocol (HTTP/2) doesn't support writeable event because GOAWAY
is a one-way
+ * notification that cannot be reversed. Once a GOAWAY frame is sent, the
connection
+ * is in graceful shutdown mode and the only way to resume is to establish
a new connection.
+ * </p>
+ * <p>
+ * This method is intentionally empty and does nothing.
+ * </p>
+ */
+ @Override
+ public void writeable() {
+ // Triple protocol (HTTP/2) doesn't support writeable event
+ // because GOAWAY is a one-way notification that cannot be reversed.
+ // Once a GOAWAY frame is sent, the connection is in graceful shutdown
mode.
+ logger.info("writeable() is not supported for Triple protocol
(HTTP/2). GOAWAY cannot be reversed.");
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index 5bc020a3d1..dba35a20a3 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.SystemPropertyConfigUtils;
+import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
@@ -352,7 +353,13 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
if (!super.isAvailable()) {
return false;
}
- return connectionClient.isConnected();
+ if (!connectionClient.isConnected()) {
+ return false;
+ }
+ if
(connectionClient.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) {
+ return false;
+ }
+ return true;
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 9d9b3ca328..017bef9053 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -20,12 +20,15 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.Configuration;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.remoting.RemotingServer;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
+import org.apache.dubbo.rpc.DefaultProtocolServer;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.PathResolver;
@@ -91,6 +94,8 @@ public class TripleProtocol extends AbstractProtocol {
ServletExchanger.init(globalConf);
Http3Exchanger.init(globalConf);
+
+ this.frameworkModel.getBeanFactory().registerBean(new
TripleGracefulShutdown(this));
}
@Override
@@ -175,7 +180,11 @@ public class TripleProtocol extends AbstractProtocol {
}
if (bindPort) {
- PortUnificationExchanger.bind(url, new DefaultPuHandler());
+ String addr = url.getAddress();
+ ConcurrentHashMapUtils.computeIfAbsent(serverMap, addr, k -> {
+ RemotingServer remotingServer =
PortUnificationExchanger.bind(url, new DefaultPuHandler());
+ return new DefaultProtocolServer(remotingServer);
+ });
}
Http3Exchanger.bind(url);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
index 5e86b592c0..8f9b1cd0e5 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
@@ -30,26 +30,124 @@ import io.netty.handler.codec.http2.Http2GoAwayFrame;
import io.netty.handler.codec.http2.Http2PingFrame;
import io.netty.util.concurrent.Future;
+/**
+ * HTTP/2 graceful shutdown handler for Triple protocol.
+ * <p>
+ * This class implements the HTTP/2 graceful shutdown mechanism as defined in
RFC 7540.
+ * The graceful shutdown process consists of:
+ * </p>
+ * <ol>
+ * <li><b>First GOAWAY:</b> Send a GOAWAY frame with last-stream-id set to
MAX_VALUE,
+ * indicating that no streams will be rejected immediately.</li>
+ * <li><b>PING frame:</b> Send a PING frame to ensure the first GOAWAY has
been received.</li>
+ * <li><b>Timeout:</b> Wait for PING ACK or timeout (10 seconds by
default).</li>
+ * <li><b>Second GOAWAY:</b> Send a final GOAWAY with the actual last
processed stream ID.</li>
+ * <li><b>Close:</b> Close the connection.</li>
+ * </ol>
+ *
+ * <h3>Usage</h3>
+ * <p>There are two ways to use this class:</p>
+ * <ul>
+ * <li><b>Static method:</b> Use {@link
#sendGoAwayFrame(ChannelHandlerContext)} to only send a GOAWAY
+ * frame without closing the connection. This is used for read-only
events during graceful shutdown.</li>
+ * <li><b>Instance method:</b> Create an instance and call {@link
#gracefulShutdown()} to perform
+ * the full graceful shutdown process including connection close.</li>
+ * </ul>
+ *
+ * @see <a href="https://httpwg.org/specs/rfc7540.html#GOAWAY">RFC 7540 -
GOAWAY</a>
+ * @since 3.3
+ */
public class GracefulShutdown {
+
+ /**
+ * Magic value for graceful shutdown PING frame.
+ * Used to identify the PING frame sent during graceful shutdown.
+ */
static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
+
+ /**
+ * Timeout for waiting PING ACK during graceful shutdown (10 seconds).
+ */
private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS =
TimeUnit.SECONDS.toNanos(10);
+
+ /**
+ * Message sent in the GOAWAY frame for read-only events.
+ */
+ private static final String READONLY_GOAWAY_MESSAGE = "server_readonly";
+
+ /**
+ * Channel handler context for writing frames.
+ */
private final ChannelHandlerContext ctx;
+
+ /**
+ * Original promise to complete when graceful shutdown is done.
+ */
private final ChannelPromise originPromise;
+
+ /**
+ * Custom message to include in the final GOAWAY frame.
+ */
private final String goAwayMessage;
+
+ /**
+ * Flag indicating whether PING ACK has been received or timeout has
occurred.
+ */
private boolean pingAckedOrTimeout;
+
+ /**
+ * Scheduled future for the PING timeout.
+ */
private Future<?> pingFuture;
+ /**
+ * Create a new GracefulShutdown instance.
+ *
+ * @param ctx the channel handler context
+ * @param goAwayMessage the message to include in the final GOAWAY frame
+ * @param originPromise the promise to complete when shutdown is done
+ */
public GracefulShutdown(ChannelHandlerContext ctx, String goAwayMessage,
ChannelPromise originPromise) {
this.ctx = ctx;
this.goAwayMessage = goAwayMessage;
this.originPromise = originPromise;
}
- public void gracefulShutdown() {
- Http2GoAwayFrame goAwayFrame =
- new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR,
ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage));
+ /**
+ * Send GOAWAY frame to notify the client that the server is going to
shutdown.
+ * This method only sends the GOAWAY frame without closing the connection.
+ * <p>
+ * The GOAWAY frame with extraStreamIds set to Integer.MAX_VALUE indicates
that
+ * the server will not accept new streams but existing streams can
continue.
+ * </p>
+ *
+ * @param ctx the channel handler context
+ */
+ public static void sendGoAwayFrame(ChannelHandlerContext ctx) {
+ Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(
+ Http2Error.NO_ERROR, ByteBufUtil.writeAscii(ctx.alloc(),
READONLY_GOAWAY_MESSAGE));
goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE);
ctx.writeAndFlush(goAwayFrame);
+ }
+
+ /**
+ * Start the full graceful shutdown process.
+ * <p>
+ * This method performs the following steps:
+ * </p>
+ * <ol>
+ * <li>Send the first GOAWAY frame with extraStreamIds = MAX_VALUE</li>
+ * <li>Schedule a timeout task (10 seconds)</li>
+ * <li>Send a PING frame with magic value to verify client received
GOAWAY</li>
+ * </ol>
+ * <p>
+ * The shutdown continues in {@link
#secondGoAwayAndClose(ChannelHandlerContext)} when
+ * either PING ACK is received or timeout occurs.
+ * </p>
+ */
+ public void gracefulShutdown() {
+ sendGoAwayFrame(ctx);
+
pingFuture = ctx.executor()
.schedule(() -> secondGoAwayAndClose(ctx),
GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
@@ -57,6 +155,18 @@ public class GracefulShutdown {
ctx.writeAndFlush(pingFrame);
}
+ /**
+ * Send the second GOAWAY frame and close the connection.
+ * <p>
+ * This method is called either when PING ACK is received or when the
timeout expires.
+ * It sends a final GOAWAY frame with the custom message and then closes
the connection.
+ * </p>
+ * <p>
+ * This method is idempotent - it will only execute once even if called
multiple times.
+ * </p>
+ *
+ * @param ctx the channel handler context
+ */
void secondGoAwayAndClose(ChannelHandlerContext ctx) {
if (pingAckedOrTimeout) {
return;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleServerConnectionHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleServerConnectionHandler.java
index 06c517208e..6847988e3c 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleServerConnectionHandler.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleServerConnectionHandler.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.protocol.tri.transport;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.remoting.event.ReadOnlyEvent;
import java.io.IOException;
import java.net.SocketException;
@@ -95,8 +96,35 @@ public class TripleServerConnectionHandler extends
Http2ChannelDuplexHandler {
return QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName());
}
+ /**
+ * Handle user events triggered on the channel.
+ * <p>
+ * This method specifically handles {@link ReadOnlyEvent} for graceful
shutdown:
+ * </p>
+ * <ul>
+ * <li>When a {@link ReadOnlyEvent} is received, send a GOAWAY frame to
the client
+ * indicating that the server is entering read-only mode and will
not accept new streams.</li>
+ * <li>Other events are delegated to the superclass handler.</li>
+ * </ul>
+ * <p>
+ * Note: Unlike the full graceful shutdown process (triggered by {@code
close()}),
+ * the ReadOnlyEvent only sends a GOAWAY frame without closing the
connection.
+ * This allows existing streams to complete while preventing new streams.
+ * </p>
+ *
+ * @param ctx the channel handler context
+ * @param evt the user event
+ * @throws Exception if an error occurs during event handling
+ */
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
+ if (evt instanceof ReadOnlyEvent) {
+ GracefulShutdown.sendGoAwayFrame(ctx);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sent GOAWAY frame for graceful shutdown
(ReadOnlyEvent) on channel: " + ctx.channel());
+ }
+ return;
+ }
super.userEventTriggered(ctx, evt);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleGracefulShutdownTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleGracefulShutdownTest.java
new file mode 100644
index 0000000000..884ad9e10d
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleGracefulShutdownTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri;
+
+import org.apache.dubbo.remoting.ChannelEvent;
+import org.apache.dubbo.remoting.RemotingServer;
+import org.apache.dubbo.remoting.event.ReadOnlyEvent;
+import org.apache.dubbo.remoting.event.WriteableEvent;
+import org.apache.dubbo.rpc.GracefulShutdown;
+import org.apache.dubbo.rpc.ProtocolServer;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link TripleGracefulShutdown}.
+ */
+class TripleGracefulShutdownTest {
+
+ private TripleProtocol mockTripleProtocol;
+ private ProtocolServer mockServer1;
+ private ProtocolServer mockServer2;
+ private RemotingServer mockRemotingServer1;
+ private RemotingServer mockRemotingServer2;
+
+ @BeforeEach
+ void setUp() {
+ mockTripleProtocol = Mockito.mock(TripleProtocol.class);
+ mockServer1 = Mockito.mock(ProtocolServer.class);
+ mockServer2 = Mockito.mock(ProtocolServer.class);
+ mockRemotingServer1 = Mockito.mock(RemotingServer.class);
+ mockRemotingServer2 = Mockito.mock(RemotingServer.class);
+
+ when(mockServer1.getRemotingServer()).thenReturn(mockRemotingServer1);
+ when(mockServer2.getRemotingServer()).thenReturn(mockRemotingServer2);
+ }
+
+ /**
+ * Test that TripleGracefulShutdown implements GracefulShutdown.
+ */
+ @Test
+ void testImplementsGracefulShutdown() {
+ TripleGracefulShutdown shutdown = new
TripleGracefulShutdown(mockTripleProtocol);
+ assertTrue(shutdown instanceof GracefulShutdown);
+ }
+
+ /**
+ * Test readonly sends ReadOnlyEvent to all servers.
+ */
+ @Test
+ void testReadonlySendsReadOnlyEvent() {
+
when(mockTripleProtocol.getServers()).thenReturn(Collections.singletonList(mockServer1));
+
+ TripleGracefulShutdown shutdown = new
TripleGracefulShutdown(mockTripleProtocol);
+ shutdown.readonly();
+
+ ArgumentCaptor<ChannelEvent> captor =
ArgumentCaptor.forClass(ChannelEvent.class);
+ verify(mockRemotingServer1,
times(1)).fireChannelEvent(captor.capture());
+
+ ChannelEvent capturedEvent = captor.getValue();
+ assertTrue(capturedEvent instanceof ReadOnlyEvent);
+ assertSame(ReadOnlyEvent.INSTANCE, capturedEvent);
+ }
+
+ /**
+ * Test writeable does not send any event (not supported for Triple
protocol).
+ */
+ @Test
+ void testWriteableDoesNotSendEvent() {
+
when(mockTripleProtocol.getServers()).thenReturn(Collections.singletonList(mockServer1));
+
+ TripleGracefulShutdown shutdown = new
TripleGracefulShutdown(mockTripleProtocol);
+ shutdown.writeable();
+
+ // Writeable is not supported for Triple protocol, so no event should
be sent
+ verify(mockRemotingServer1,
never()).fireChannelEvent(WriteableEvent.INSTANCE);
+ }
+
+ /**
+ * Test readonly sends event to multiple servers.
+ */
+ @Test
+ void testReadonlyMultipleServers() {
+ List<ProtocolServer> servers = Arrays.asList(mockServer1, mockServer2);
+ when(mockTripleProtocol.getServers()).thenReturn(servers);
+
+ TripleGracefulShutdown shutdown = new
TripleGracefulShutdown(mockTripleProtocol);
+ shutdown.readonly();
+
+ verify(mockRemotingServer1,
times(1)).fireChannelEvent(ReadOnlyEvent.INSTANCE);
+ verify(mockRemotingServer2,
times(1)).fireChannelEvent(ReadOnlyEvent.INSTANCE);
+ }
+
+ /**
+ * Test with empty server list.
+ */
+ @Test
+ void testEmptyServerList() {
+
when(mockTripleProtocol.getServers()).thenReturn(Collections.emptyList());
+
+ TripleGracefulShutdown shutdown = new
TripleGracefulShutdown(mockTripleProtocol);
+
+ // Should not throw exception
+ shutdown.readonly();
+ shutdown.writeable();
+ }
+
+ /**
+ * Test that getServers returns the protocol's servers.
+ */
+ @Test
+ void testGetServersReturnsProtocolServers() {
+ List<ProtocolServer> expectedServers = Arrays.asList(mockServer1,
mockServer2);
+ when(mockTripleProtocol.getServers()).thenReturn(expectedServers);
+
+ TripleGracefulShutdown shutdown = new
TripleGracefulShutdown(mockTripleProtocol);
+
+ // Trigger readonly to indirectly verify getServers is called
+ shutdown.readonly();
+
+ verify(mockTripleProtocol, times(1)).getServers();
+ }
+
+ /**
+ * Test that writeable can be called multiple times without error.
+ */
+ @Test
+ void testWriteableCanBeCalledMultipleTimes() {
+
when(mockTripleProtocol.getServers()).thenReturn(Collections.singletonList(mockServer1));
+
+ TripleGracefulShutdown shutdown = new
TripleGracefulShutdown(mockTripleProtocol);
+
+ // Should not throw exception even when called multiple times
+ shutdown.writeable();
+ shutdown.writeable();
+ shutdown.writeable();
+
+ // No events should be sent
+ verify(mockRemotingServer1,
never()).fireChannelEvent(WriteableEvent.INSTANCE);
+ }
+}