This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 9bab773  [Feature] CPU load balancing implementation and code 
structure optimization (#38)
9bab773 is described below

commit 9bab773b7c5caabf6cf147965298f621246daaa0
Author: Zaki <[email protected]>
AuthorDate: Mon Aug 26 09:57:25 2024 +0800

    [Feature] CPU load balancing implementation and code structure optimization 
(#38)
    
    * feat&fix: Optimize the logic of h2, remove duplicate code, add cpu 
monitoring
    
    * fix: Optimize some code
---
 dubbo/__version__.py                               |   2 +-
 dubbo/cluster/failfast_cluster.py                  |   6 +-
 dubbo/cluster/loadbalances.py                      |  44 +++
 .../logging => cluster/monitor}/__init__.py        |   4 -
 dubbo/cluster/monitor/cpu.py                       | 176 +++++++++++
 dubbo/common/__init__.py                           |  32 --
 dubbo/common/classes.py                            |  41 ---
 dubbo/common/constants.py                          |  62 ----
 dubbo/common/deliverers.py                         | 314 --------------------
 dubbo/common/node.py                               |  58 ----
 dubbo/common/types.py                              |  22 --
 dubbo/common/url.py                                | 325 ---------------------
 dubbo/common/utils.py                              | 129 --------
 dubbo/config/__init__.py                           |  19 --
 dubbo/config/logger_config.py                      | 150 ----------
 dubbo/config/protocol_config.py                    |  30 --
 dubbo/config/reference_config.py                   |  62 ----
 dubbo/config/service_config.py                     |  71 -----
 dubbo/extension/registries.py                      |   1 +
 dubbo/logger/__init__.py                           |  23 --
 dubbo/logger/_interfaces.py                        | 204 -------------
 dubbo/logger/constants.py                          | 127 --------
 dubbo/logger/logger_factory.py                     | 127 --------
 dubbo/logger/logging/formatter.py                  |  89 ------
 dubbo/logger/logging/logger.py                     |  94 ------
 dubbo/logger/logging/logger_adapter.py             | 186 ------------
 dubbo/protocol/triple/invoker.py                   |   8 +-
 dubbo/protocol/triple/protocol.py                  |  10 +-
 dubbo/registry/protocol.py                         |  16 +-
 dubbo/remoting/aio/http2/controllers.py            |   8 +
 dubbo/remoting/aio/http2/frames.py                 |  15 +-
 dubbo/remoting/aio/http2/protocol.py               |  35 +--
 dubbo/remoting/aio/http2/stream.py                 |  13 +-
 dubbo/remoting/aio/http2/stream_handler.py         |  21 +-
 dubbo/remoting/aio/http2/utils.py                  |  16 +-
 dubbo/utils.py                                     |  74 ++++-
 requirements.txt                                   |   3 +-
 samples/registry/zookeeper/client.py               |  10 +-
 samples/stream/server_stream/client.py             |   1 -
 setup.py                                           |   6 +-
 40 files changed, 394 insertions(+), 2240 deletions(-)

diff --git a/dubbo/__version__.py b/dubbo/__version__.py
index aeae1de..7302839 100644
--- a/dubbo/__version__.py
+++ b/dubbo/__version__.py
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-__version__ = "1.0.0b1"
+__version__ = "3.0.0b1"
diff --git a/dubbo/cluster/failfast_cluster.py 
b/dubbo/cluster/failfast_cluster.py
index 8bfe47a..ee5497e 100644
--- a/dubbo/cluster/failfast_cluster.py
+++ b/dubbo/cluster/failfast_cluster.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 from dubbo.cluster import Cluster, Directory, LoadBalance
+from dubbo.cluster.loadbalances import CpuLoadBalance
 from dubbo.constants import common_constants
 from dubbo.extension import extensionLoader
 from dubbo.protocol import Invoker, Result
@@ -27,13 +28,16 @@ class FailfastInvoker(Invoker):
     FailfastInvoker
     """
 
-    def __init__(self, directory: Directory, url: URL):
+    def __init__(self, directory, url: URL):
         self._directory = directory
 
         self._load_balance = extensionLoader.get_extension(
             LoadBalance, url.parameters.get(common_constants.LOADBALANCE_KEY, 
"random")
         )()
 
+        if isinstance(self._load_balance, CpuLoadBalance):
+            self._load_balance.set_monitor(directory)
+
     def invoke(self, invocation) -> Result:
 
         # get the invokers
diff --git a/dubbo/cluster/loadbalances.py b/dubbo/cluster/loadbalances.py
index 4b6f0b3..94e2e54 100644
--- a/dubbo/cluster/loadbalances.py
+++ b/dubbo/cluster/loadbalances.py
@@ -18,6 +18,7 @@ import random
 from typing import List, Optional
 
 from dubbo.cluster import LoadBalance
+from dubbo.cluster.monitor.cpu import CpuMonitor
 from dubbo.protocol import Invocation, Invoker
 
 
@@ -63,3 +64,46 @@ class RandomLoadBalance(AbstractLoadBalance):
     ) -> Optional[Invoker]:
         randint = random.randint(0, len(invokers) - 1)
         return invokers[randint]
+
+
+class CpuLoadBalance(AbstractLoadBalance):
+    """
+    CPU load balance.
+    """
+
+    def __init__(self):
+        self._monitor: Optional[CpuMonitor] = None
+
+    def set_monitor(self, monitor: CpuMonitor) -> None:
+        """
+        Set the CPU monitor.
+        :param monitor: The CPU monitor.
+        :type monitor: CpuMonitor
+        """
+        self._monitor = monitor
+
+    def do_select(
+        self, invokers: List[Invoker], invocation: Invocation
+    ) -> Optional[Invoker]:
+        # get the CPU usage
+        cpu_usages = self._monitor.get_cpu_usage(invokers)
+        # Select the caller with the lowest CPU usage, 0 means CPU usage is 
unknown.
+        available_invokers = []
+        unknown_invokers = []
+
+        for invoker, cpu_usage in cpu_usages.items():
+            if cpu_usage == 0:
+                unknown_invokers.append((cpu_usage, invoker))
+            else:
+                available_invokers.append((cpu_usage, invoker))
+
+        if available_invokers:
+            # sort and select the invoker with the lowest CPU usage
+            available_invokers.sort(key=lambda x: x[0])
+            return available_invokers[0][1]
+        elif unknown_invokers:
+            # get the invoker with unknown CPU usage randomly
+            randint = random.randint(0, len(unknown_invokers) - 1)
+            return unknown_invokers[randint][1]
+        else:
+            return None
diff --git a/dubbo/logger/logging/__init__.py 
b/dubbo/cluster/monitor/__init__.py
similarity index 91%
rename from dubbo/logger/logging/__init__.py
rename to dubbo/cluster/monitor/__init__.py
index 10e45eb..bcba37a 100644
--- a/dubbo/logger/logging/__init__.py
+++ b/dubbo/cluster/monitor/__init__.py
@@ -13,7 +13,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from .logger_adapter import LoggerAdapter
-
-__all__ = ["LoggerAdapter"]
diff --git a/dubbo/cluster/monitor/cpu.py b/dubbo/cluster/monitor/cpu.py
new file mode 100644
index 0000000..3b3831f
--- /dev/null
+++ b/dubbo/cluster/monitor/cpu.py
@@ -0,0 +1,176 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import threading
+from typing import Dict, List
+from dubbo.cluster.directories import RegistryDirectory
+from dubbo.constants import common_constants
+from dubbo.loggers import loggerFactory
+from dubbo.protocol import Protocol, Invoker
+from dubbo.protocol.invocation import RpcInvocation
+from dubbo.proxy.handlers import RpcServiceHandler, RpcMethodHandler
+from dubbo.registry import Registry
+from dubbo.types import UnaryCallType
+from dubbo.url import URL
+from dubbo.utils import CpuUtils
+
+_LOGGER = loggerFactory.get_logger()
+
+_cpu_invocation = RpcInvocation(
+    "org.apache.dubbo.MetricsService",
+    "cpu",
+    str(1).encode("utf-8"),
+    attributes={
+        common_constants.CALL_KEY: UnaryCallType,
+    },
+)
+
+
+class CpuMonitor(RegistryDirectory):
+    """
+    The CPU monitor.
+    """
+
+    def __init__(self, registry: Registry, protocol: Protocol, url: URL):
+        super().__init__(registry, protocol, url)
+
+        # interval
+        self._interval = 5
+
+        # about CPU usage
+        self._usages_lock = threading.Lock()
+        self._cpu_usages: Dict[Invoker, float] = {}
+
+        # running invokers
+        self._running_invokers: Dict[str, Invoker] = {}
+
+        # thread
+        self._started = False
+        self._thread: threading.Thread = threading.Thread(
+            target=self._monitor_cpu, daemon=True
+        )
+        self._stop_event: threading.Event = threading.Event()
+
+        # start the monitor
+        self.start()
+
+    def start(self) -> None:
+        """
+        Start the monitor.
+        """
+        if self._stop_event.is_set():
+            raise RuntimeError("The monitor has been stopped.")
+        elif self._started:
+            return
+
+        self._started = True
+        self._thread.start()
+        _LOGGER.info("The CPU monitor has been started.")
+
+    def stop(self) -> None:
+        """
+        Stop the monitor.
+        """
+        if self._stop_event.is_set():
+            return
+        # notify the thread to stop
+        self._stop_event.set()
+
+    def _monitor_cpu(self) -> None:
+        """
+        Monitor the CPU usage.
+        """
+        while True:
+            # get available invokers
+            available_invokers = {
+                url: invoker
+                for url, invoker in self._invokers.items()
+                if invoker.is_available()
+            }
+
+            # update the running invokers
+            self._running_invokers = available_invokers
+
+            # update the CPU usage
+            with self._usages_lock:
+                self._cpu_usages = {
+                    invoker: usage
+                    for invoker, usage in self._cpu_usages.items()
+                    if invoker in available_invokers.values()
+                }
+
+            # get the CPU usage for each invoker
+            for url, invoker in self._running_invokers.items():
+                if invoker.is_available():
+                    try:
+                        result = invoker.invoke(_cpu_invocation)
+                        cpu_usage = float(result.value().decode("utf-8"))
+                        self._cpu_usages[invoker] = cpu_usage
+                    except Exception as e:
+                        _LOGGER.error(
+                            f"Failed to get the CPU usage for invoker {url}: 
{str(e)}"
+                        )
+                        # remove the cpu usage
+                        self._remove_cpu_usage(invoker)
+
+            # wait for the interval or stop
+            if self._stop_event.wait(self._interval):
+                _LOGGER.info("The CPU monitor has been stopped.")
+                break
+
+    def get_cpu_usage(self, invokers: List[Invoker]) -> Dict[Invoker, float]:
+        """
+        Get the CPU usage for the invoker.
+        :param invokers: The invokers.
+        :type invokers: List[Invoker]
+        :return: The CPU usage.
+        :rtype: Dict[Invoker, float]
+        """
+        with self._usages_lock:
+            return {invoker: self._cpu_usages.get(invoker, 0) for invoker in 
invokers}
+
+    def _remove_cpu_usage(self, invoker: Invoker) -> None:
+        with self._usages_lock:
+            self._cpu_usages.pop(invoker)
+
+
+class CpuInnerRpcHandler:
+    """
+    The CPU inner RPC handler.
+    """
+
+    @staticmethod
+    def get_service_handler() -> RpcServiceHandler:
+        """
+        Get the service handler.
+        :return: The service handler.
+        :rtype: RpcServiceHandler
+        """
+        return RpcServiceHandler(
+            "org.apache.dubbo.MetricsService",
+            {"cpu": RpcMethodHandler.unary(CpuInnerRpcHandler.get_cpu_usage)},
+        )
+
+    @staticmethod
+    def get_cpu_usage(interval) -> bytes:
+        """
+        Get the CPU usage.
+        :param interval: The interval.
+        :type interval: bytes
+        :return: The CPU usage.
+        :rtype: bytes
+        """
+        float_value = 
CpuUtils.get_total_cpu_usage(interval=int(interval.decode("utf-8")))
+        return str(float_value).encode("utf-8")
diff --git a/dubbo/common/__init__.py b/dubbo/common/__init__.py
deleted file mode 100644
index a860593..0000000
--- a/dubbo/common/__init__.py
+++ /dev/null
@@ -1,32 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from .classes import SingletonBase
-from .deliverers import MultiMessageDeliverer, SingleMessageDeliverer
-from .node import Node
-from .types import DeserializingFunction, SerializingFunction
-from .url import URL, create_url
-
-__all__ = [
-    "SingleMessageDeliverer",
-    "MultiMessageDeliverer",
-    "URL",
-    "create_url",
-    "Node",
-    "SingletonBase",
-    "DeserializingFunction",
-    "SerializingFunction",
-]
diff --git a/dubbo/common/classes.py b/dubbo/common/classes.py
deleted file mode 100644
index b27c7b9..0000000
--- a/dubbo/common/classes.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import threading
-
-__all__ = ["SingletonBase"]
-
-
-class SingletonBase:
-    """
-    Singleton base class. This class ensures that only one instance of a 
derived class exists.
-
-    This implementation is thread-safe.
-    """
-
-    _instance = None
-    _instance_lock = threading.Lock()
-
-    def __new__(cls, *args, **kwargs):
-        """
-        Create a new instance of the class if it does not exist.
-        """
-        if cls._instance is None:
-            with cls._instance_lock:
-                # double check
-                if cls._instance is None:
-                    cls._instance = super(SingletonBase, cls).__new__(cls)
-        return cls._instance
diff --git a/dubbo/common/constants.py b/dubbo/common/constants.py
deleted file mode 100644
index 33e4f9f..0000000
--- a/dubbo/common/constants.py
+++ /dev/null
@@ -1,62 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-PROTOCOL_KEY = "protocol"
-TRIPLE = "triple"
-TRIPLE_SHORT = "tri"
-
-SIDE_KEY = "side"
-SERVER_VALUE = "server"
-CLIENT_VALUE = "client"
-
-METHOD_KEY = "method"
-SERVICE_KEY = "service"
-
-SERVICE_HANDLER_KEY = "service-handler"
-
-GROUP_KEY = "group"
-
-LOCAL_HOST_KEY = "localhost"
-LOCAL_HOST_VALUE = "127.0.0.1"
-DEFAULT_PORT = 50051
-
-SSL_ENABLED_KEY = "ssl-enabled"
-
-SERIALIZATION_KEY = "serialization"
-SERIALIZER_KEY = "serializer"
-DESERIALIZER_KEY = "deserializer"
-
-
-COMPRESSION_KEY = "compression"
-COMPRESSOR_KEY = "compressor"
-DECOMPRESSOR_KEY = "decompressor"
-
-
-TRANSPORTER_KEY = "transporter"
-TRANSPORTER_DEFAULT_VALUE = "aio"
-
-TRUE_VALUE = "true"
-FALSE_VALUE = "false"
-
-CALL_KEY = "call"
-UNARY_CALL_VALUE = "unary"
-CLIENT_STREAM_CALL_VALUE = "client-stream"
-SERVER_STREAM_CALL_VALUE = "server-stream"
-BI_STREAM_CALL_VALUE = "bi-stream"
-
-PATH_SEPARATOR = "/"
-PROTOCOL_SEPARATOR = "://"
-DYNAMIC_KEY = "dynamic"
diff --git a/dubbo/common/deliverers.py b/dubbo/common/deliverers.py
deleted file mode 100644
index 67790ec..0000000
--- a/dubbo/common/deliverers.py
+++ /dev/null
@@ -1,314 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import abc
-import enum
-import queue
-import threading
-from typing import Any, Optional
-
-__all__ = ["MessageDeliverer", "SingleMessageDeliverer", 
"MultiMessageDeliverer"]
-
-
-class DelivererStatus(enum.Enum):
-    """
-    Enumeration for deliverer status.
-
-    Possible statuses:
-        - PENDING: The deliverer is pending action.
-        - COMPLETED: The deliverer has completed the action.
-        - CANCELLED: The action for the deliverer has been cancelled.
-        - FINISHED: The deliverer has finished all actions and is in a final 
state.
-    """
-
-    PENDING = 0
-    COMPLETED = 1
-    CANCELLED = 2
-    FINISHED = 3
-
-    @classmethod
-    def change_allowed(
-        cls, current_status: "DelivererStatus", target_status: 
"DelivererStatus"
-    ) -> bool:
-        """
-        Check if a transition from `current_status` to `target_status` is 
allowed.
-
-        :param current_status: The current status of the deliverer.
-        :type current_status: DelivererStatus
-        :param target_status: The target status to transition to.
-        :type target_status: DelivererStatus
-        :return: A boolean indicating if the transition is allowed.
-        :rtype: bool
-        """
-        # PENDING -> COMPLETED or CANCELLED
-        if current_status == cls.PENDING:
-            return target_status in {cls.COMPLETED, cls.CANCELLED}
-
-        # COMPLETED -> FINISHED or CANCELLED
-        elif current_status == cls.COMPLETED:
-            return target_status in {cls.FINISHED, cls.CANCELLED}
-
-        # CANCELLED -> FINISHED
-        elif current_status == cls.CANCELLED:
-            return target_status == cls.FINISHED
-
-        # FINISHED is the final state, no further transitions allowed
-        else:
-            return False
-
-
-class NoMoreMessageError(RuntimeError):
-    """
-    Exception raised when no more messages are available.
-    """
-
-    def __init__(self, message: str = "No more message"):
-        super().__init__(message)
-
-
-class EmptyMessageError(RuntimeError):
-    """
-    Exception raised when the message is empty.
-    """
-
-    def __init__(self, message: str = "Message is empty"):
-        super().__init__(message)
-
-
-class MessageDeliverer(abc.ABC):
-    """
-    Abstract base class for message deliverers.
-    """
-
-    __slots__ = ["_status"]
-
-    def __init__(self):
-        self._status = DelivererStatus.PENDING
-
-    @abc.abstractmethod
-    def add(self, message: Any) -> None:
-        """
-        Add a message to the deliverer.
-
-        :param message: The message to be added.
-        :type message: Any
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def complete(self, message: Any = None) -> None:
-        """
-        Mark the message delivery as complete.
-
-        :param message: The last message (optional).
-        :type message: Any, optional
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def cancel(self, exc: Optional[Exception]) -> None:
-        """
-        Cancel the message delivery.
-
-        :param exc: The exception that caused the cancellation.
-        :type exc: Exception, optional
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def get(self) -> Any:
-        """
-        Get the next message.
-
-        :return: The next message.
-        :rtype: Any
-        :raises NoMoreMessageError: If no more messages are available.
-        :raises Exception: If the message delivery is cancelled.
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def get_nowait(self) -> Any:
-        """
-        Get the next message without waiting.
-
-        :return: The next message.
-        :rtype: Any
-        :raises EmptyMessageError: If the message is empty.
-        :raises NoMoreMessageError: If no more messages are available.
-        :raises Exception: If the message delivery is cancelled.
-        """
-        raise NotImplementedError()
-
-
-class SingleMessageDeliverer(MessageDeliverer):
-    """
-    Message deliverer for a single message using a signal-based approach.
-    """
-
-    __slots__ = ["_condition", "_message"]
-
-    def __init__(self):
-        super().__init__()
-        self._condition = threading.Condition()
-        self._message: Any = None
-
-    def add(self, message: Any) -> None:
-        with self._condition:
-            if self._status is DelivererStatus.PENDING:
-                # Add the message
-                self._message = message
-
-    def complete(self, message: Any = None) -> None:
-        with self._condition:
-            if DelivererStatus.change_allowed(self._status, 
DelivererStatus.COMPLETED):
-                if message is not None:
-                    self._message = message
-                # update the status
-                self._status = DelivererStatus.COMPLETED
-                self._condition.notify_all()
-
-    def cancel(self, exc: Optional[Exception]) -> None:
-        with self._condition:
-            if DelivererStatus.change_allowed(self._status, 
DelivererStatus.CANCELLED):
-                # Cancel the delivery
-                self._message = exc or RuntimeError("delivery cancelled.")
-                self._status = DelivererStatus.CANCELLED
-                self._condition.notify_all()
-
-    def get(self) -> Any:
-        with self._condition:
-            if self._status is DelivererStatus.FINISHED:
-                raise NoMoreMessageError("Message already consumed.")
-
-            if self._status is DelivererStatus.PENDING:
-                # If the message is not available, wait
-                self._condition.wait()
-
-            # check the status
-            if self._status is DelivererStatus.CANCELLED:
-                raise self._message
-
-            self._status = DelivererStatus.FINISHED
-            return self._message
-
-    def get_nowait(self) -> Any:
-        with self._condition:
-            if self._status is DelivererStatus.FINISHED:
-                self._status = DelivererStatus.PENDING
-                return self._message
-
-            # raise error
-            if self._status is DelivererStatus.FINISHED:
-                raise NoMoreMessageError("Message already consumed.")
-            elif self._status is DelivererStatus.CANCELLED:
-                raise self._message
-            elif self._status is DelivererStatus.PENDING:
-                raise EmptyMessageError("Message is empty")
-
-
-class MultiMessageDeliverer(MessageDeliverer):
-    """
-    Message deliverer supporting multiple messages.
-    """
-
-    __slots__ = ["_lock", "_counter", "_messages", "_END_SENTINEL"]
-
-    def __init__(self):
-        super().__init__()
-        self._lock = threading.Lock()
-        self._counter = 0
-        self._messages: queue.PriorityQueue[Any] = queue.PriorityQueue()
-        self._END_SENTINEL = object()
-
-    def add(self, message: Any) -> None:
-        with self._lock:
-            if self._status is DelivererStatus.PENDING:
-                # Add the message
-                self._counter += 1
-                self._messages.put_nowait((self._counter, message))
-
-    def complete(self, message: Any = None) -> None:
-        with self._lock:
-            if DelivererStatus.change_allowed(self._status, 
DelivererStatus.COMPLETED):
-                if message is not None:
-                    self._counter += 1
-                    self._messages.put_nowait((self._counter, message))
-
-                # Add the end sentinel
-                self._counter += 1
-                self._messages.put_nowait((self._counter, self._END_SENTINEL))
-                self._status = DelivererStatus.COMPLETED
-
-    def cancel(self, exc: Optional[Exception]) -> None:
-        with self._lock:
-            if DelivererStatus.change_allowed(self._status, 
DelivererStatus.CANCELLED):
-                # Set the priority to -1 -> make sure it is the first message
-                self._messages.put_nowait(
-                    (-1, exc or RuntimeError("delivery cancelled."))
-                )
-                self._status = DelivererStatus.CANCELLED
-
-    def get(self) -> Any:
-        if self._status is DelivererStatus.FINISHED:
-            raise NoMoreMessageError("No more message")
-
-        # block until the message is available
-        priority, message = self._messages.get()
-
-        # check the status
-        if self._status is DelivererStatus.CANCELLED:
-            raise message
-        elif message is self._END_SENTINEL:
-            self._status = DelivererStatus.FINISHED
-            raise NoMoreMessageError("No more message")
-        else:
-            return message
-
-    def get_nowait(self) -> Any:
-        try:
-            if self._status is DelivererStatus.FINISHED:
-                raise NoMoreMessageError("No more message")
-
-            priority, message = self._messages.get_nowait()
-
-            # check the status
-            if self._status is DelivererStatus.CANCELLED:
-                raise message
-            elif message is self._END_SENTINEL:
-                self._status = DelivererStatus.FINISHED
-                raise NoMoreMessageError("No more message")
-            else:
-                return message
-        except queue.Empty:
-            raise EmptyMessageError("Message is empty")
-
-    def __iter__(self):
-        return self
-
-    def __next__(self):
-        """
-        Returns the next request from the queue.
-
-        :return: The next message.
-        :rtype: Any
-        :raises StopIteration: If no more messages are available.
-        """
-        while True:
-            try:
-                return self.get()
-            except NoMoreMessageError:
-                raise StopIteration
diff --git a/dubbo/common/node.py b/dubbo/common/node.py
deleted file mode 100644
index a5ec339..0000000
--- a/dubbo/common/node.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import abc
-
-from dubbo.common.url import URL
-
-__all__ = ["Node"]
-
-
-class Node(abc.ABC):
-    """
-    Abstract base class for a Node.
-    """
-
-    @abc.abstractmethod
-    def get_url(self) -> URL:
-        """
-        Get the URL of the node.
-
-        :return: The URL of the node.
-        :rtype: URL
-        :raises NotImplementedError: If the method is not implemented.
-        """
-        raise NotImplementedError("get_url() is not implemented.")
-
-    @abc.abstractmethod
-    def is_available(self) -> bool:
-        """
-        Check if the node is available.
-
-        :return: True if the node is available, False otherwise.
-        :rtype: bool
-        :raises NotImplementedError: If the method is not implemented.
-        """
-        raise NotImplementedError("is_available() is not implemented.")
-
-    @abc.abstractmethod
-    def destroy(self) -> None:
-        """
-        Destroy the node.
-
-        :raises NotImplementedError: If the method is not implemented.
-        """
-        raise NotImplementedError("destroy() is not implemented.")
diff --git a/dubbo/common/types.py b/dubbo/common/types.py
deleted file mode 100644
index 029b837..0000000
--- a/dubbo/common/types.py
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import Any, Callable
-
-__all__ = ["SerializingFunction", "DeserializingFunction"]
-
-SerializingFunction = Callable[[Any], bytes]
-DeserializingFunction = Callable[[bytes], Any]
diff --git a/dubbo/common/url.py b/dubbo/common/url.py
deleted file mode 100644
index 581fd84..0000000
--- a/dubbo/common/url.py
+++ /dev/null
@@ -1,325 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import copy
-from typing import Any, Dict, Optional
-from urllib import parse
-from urllib.parse import urlencode, urlunparse
-
-from dubbo.common.constants import PROTOCOL_SEPARATOR
-
-__all__ = ["URL", "create_url"]
-
-
-def create_url(url: str, encoded: bool = False) -> "URL":
-    """
-    Creates a URL object from a URL string.
-
-    This function takes a URL string and converts it into a URL object.
-    If the 'encoded' parameter is set to True, the URL string will be decoded 
before being converted.
-
-    :param url: The URL string to be converted into a URL object.
-    :type url: str
-    :param encoded: Determines if the URL string should be decoded before 
being converted. Defaults to False.
-    :type encoded: bool
-    :return: A URL object.
-    :rtype: URL
-    :raises ValueError: If the URL format is invalid.
-    """
-    # If the URL is encoded, decode it
-    if encoded:
-        url = parse.unquote(url)
-
-    if PROTOCOL_SEPARATOR not in url:
-        raise ValueError("Invalid URL format: missing protocol")
-
-    parsed_url = parse.urlparse(url)
-
-    if not parsed_url.scheme:
-        raise ValueError("Invalid URL format: missing scheme.")
-
-    return URL(
-        parsed_url.scheme,
-        parsed_url.hostname or "",
-        parsed_url.port,
-        parsed_url.username or "",
-        parsed_url.password or "",
-        parsed_url.path.lstrip("/"),
-        {k: v[0] for k, v in parse.parse_qs(parsed_url.query).items()},
-    )
-
-
-class URL:
-    """
-    URL - Uniform Resource Locator.
-    """
-
-    __slots__ = [
-        "_scheme",
-        "_host",
-        "_port",
-        "_location",
-        "_username",
-        "_password",
-        "_path",
-        "_parameters",
-        "_attributes",
-    ]
-
-    def __init__(
-        self,
-        scheme: str,
-        host: str,
-        port: Optional[int] = None,
-        username: str = "",
-        password: str = "",
-        path: str = "",
-        parameters: Optional[Dict[str, str]] = None,
-        attributes: Optional[Dict[str, Any]] = None,
-    ):
-        """
-        Initialize the URL object.
-
-        :param scheme: The scheme of the URL (e.g., 'http', 'https').
-        :type scheme: str
-        :param host: The host of the URL.
-        :type host: str
-        :param port: The port number of the URL, defaults to None.
-        :type port: int, optional
-        :param username: The username for authentication, defaults to an empty 
string.
-        :type username: str, optional
-        :param password: The password for authentication, defaults to an empty 
string.
-        :type password: str, optional
-        :param path: The path of the URL, defaults to an empty string.
-        :type path: str, optional
-        :param parameters: The query parameters of the URL as a dictionary, 
defaults to None.
-        :type parameters: Dict[str, str], optional
-        :param attributes: Additional attributes of the URL as a dictionary, 
defaults to None.
-        :type attributes: Dict[str, Any], optional
-        """
-        self._scheme = scheme
-        self._host = host
-        self._port = port
-        self._location = f"{host}:{port}" if port else host
-        self._username = username
-        self._password = password
-        self._path = path
-        self._parameters = parameters or {}
-        self._attributes = attributes or {}
-
-    @property
-    def scheme(self) -> str:
-        """
-        Get or set the scheme of the URL.
-
-        :return: The scheme of the URL.
-        :rtype: str
-        """
-        return self._scheme
-
-    @scheme.setter
-    def scheme(self, value: str):
-        self._scheme = value
-
-    @property
-    def host(self) -> str:
-        """
-        Get or set the host of the URL.
-
-        :return: The host of the URL.
-        :rtype: str
-        """
-        return self._host
-
-    @host.setter
-    def host(self, value: str):
-        self._host = value
-        self._location = f"{self.host}:{self.port}" if self.port else self.host
-
-    @property
-    def port(self) -> Optional[int]:
-        """
-        Get or set the port of the URL.
-
-        :return: The port of the URL.
-        :rtype: int, optional
-        """
-        return self._port
-
-    @port.setter
-    def port(self, value: int):
-        if value > 0:
-            self._port = value
-            self._location = f"{self.host}:{self.port}"
-
-    @property
-    def location(self) -> str:
-        """
-        Get or set the location (host:port) of the URL.
-
-        :return: The location of the URL.
-        :rtype: str
-        """
-        return self._location
-
-    @location.setter
-    def location(self, value: str):
-        try:
-            values = value.split(":")
-            self.host = values[0]
-            if len(values) == 2:
-                self.port = int(values[1])
-        except Exception as e:
-            raise ValueError(f"Invalid location: {value}") from e
-
-    @property
-    def username(self) -> str:
-        """
-        Get or set the username for authentication.
-
-        :return: The username.
-        :rtype: str
-        """
-        return self._username
-
-    @username.setter
-    def username(self, value: str):
-        self._username = value
-
-    @property
-    def password(self) -> str:
-        """
-        Get or set the password for authentication.
-
-        :return: The password.
-        :rtype: str
-        """
-        return self._password
-
-    @password.setter
-    def password(self, value: str):
-        self._password = value
-
-    @property
-    def path(self) -> str:
-        """
-        Get or set the path of the URL.
-
-        :return: The path of the URL.
-        :rtype: str
-        """
-        return self._path
-
-    @path.setter
-    def path(self, value: str):
-        self._path = value.lstrip("/")
-
-    @property
-    def parameters(self) -> Dict[str, str]:
-        """
-        Get the query parameters of the URL.
-
-        :return: The query parameters as a dictionary.
-        :rtype: Dict[str, str]
-        """
-        return self._parameters
-
-    @property
-    def attributes(self) -> Dict[str, Any]:
-        """
-        Get the additional attributes of the URL.
-
-        :return: The attributes as a dictionary.
-        :rtype: Dict[str, Any]
-        """
-        return self._attributes
-
-    def to_str(self, encode: bool = False) -> str:
-        """
-        Converts the URL to a string.
-
-        :param encode: Determines if the URL should be encoded. Defaults to 
False.
-        :type encode: bool
-        :return: The URL string.
-        :rtype: str
-        """
-        # Construct the netloc part
-        if self.username and self.password:
-            netloc = f"{self.username}:{self.password}@{self.host}"
-        else:
-            netloc = self.host
-
-        if self.port:
-            netloc = f"{netloc}:{self.port}"
-
-        # Convert parameters dictionary to query string
-        query = urlencode(self.parameters)
-
-        # Construct the URL
-        url = urlunparse((self.scheme or "", netloc, self.path or "/", "", 
query, ""))
-
-        if encode:
-            url = parse.quote(url)
-
-        return url
-
-    def copy(self) -> "URL":
-        """
-        Copy the URL object.
-
-        :return: A shallow copy of the URL object.
-        :rtype: URL
-        """
-        return copy.copy(self)
-
-    def deepcopy(self) -> "URL":
-        """
-        Deep copy the URL object.
-
-        :return: A deep copy of the URL object.
-        :rtype: URL
-        """
-        return copy.deepcopy(self)
-
-    def __copy__(self) -> "URL":
-        return URL(
-            self.scheme,
-            self.host,
-            self.port,
-            self.username,
-            self.password,
-            self.path,
-            self.parameters.copy(),
-            self.attributes.copy(),
-        )
-
-    def __deepcopy__(self, memo) -> "URL":
-        return URL(
-            self.scheme,
-            self.host,
-            self.port,
-            self.username,
-            self.password,
-            self.path,
-            copy.deepcopy(self.parameters, memo),
-            copy.deepcopy(self.attributes, memo),
-        )
-
-    def __str__(self) -> str:
-        return self.to_str()
-
-    def __repr__(self) -> str:
-        return self.to_str()
diff --git a/dubbo/common/utils.py b/dubbo/common/utils.py
deleted file mode 100644
index 4b20998..0000000
--- a/dubbo/common/utils.py
+++ /dev/null
@@ -1,129 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-__all__ = ["EventHelper", "FutureHelper"]
-
-
-class EventHelper:
-    """
-    Helper class for event operations.
-    """
-
-    @staticmethod
-    def is_set(event) -> bool:
-        """
-        Check if the event is set.
-
-        :param event: Event object, you can use threading.Event or any other 
object that supports the is_set operation.
-        :type event: Any
-        :return: True if the event is set, or False if the is_set method is 
not supported or the event is invalid.
-        :rtype: bool
-        """
-        return event.is_set() if event and hasattr(event, "is_set") else False
-
-    @staticmethod
-    def set(event) -> bool:
-        """
-        Attempt to set the event object.
-
-        :param event: Event object, you can use threading.Event or any other 
object that supports the set operation.
-        :type event: Any
-        :return: True if the event was set, False otherwise
-                (such as the event is invalid or does not support the set 
operation).
-        :rtype: bool
-        """
-        if event is None:
-            return False
-
-        # If the event supports the set operation, set the event and return 
True
-        if hasattr(event, "set"):
-            event.set()
-            return True
-
-        # If the event is invalid or does not support the set operation, 
return False
-        return False
-
-    @staticmethod
-    def clear(event) -> bool:
-        """
-        Attempt to clear the event object.
-
-        :param event: Event object, you can use threading.Event or any other 
object that supports the clear operation.
-        :type event: Any
-        :return: True if the event was cleared, False otherwise
-                (such as the event is invalid or does not support the clear 
operation).
-        :rtype: bool
-        """
-        if not event:
-            return False
-
-        # If the event supports the clear operation, clear the event and 
return True
-        if hasattr(event, "clear"):
-            event.clear()
-            return True
-
-        # If the event is invalid or does not support the clear operation, 
return False
-        return False
-
-
-class FutureHelper:
-    """
-    Helper class for future operations.
-    """
-
-    @staticmethod
-    def done(future) -> bool:
-        """
-        Check if the future is done.
-
-        :param future: Future object
-        :type future: Any
-        :return: True if the future is done, False otherwise.
-        :rtype: bool
-        """
-        return future.done() if future and hasattr(future, "done") else False
-
-    @staticmethod
-    def set_result(future, result):
-        """
-        Set the result of the future.
-
-        :param future: Future object
-        :type future: Any
-        :param result: Result to set
-        :type result: Any
-        """
-        if not future or FutureHelper.done(future):
-            return
-
-        if hasattr(future, "set_result"):
-            future.set_result(result)
-
-    @staticmethod
-    def set_exception(future, exception):
-        """
-        Set the exception to the future.
-
-        :param future: Future object
-        :type future: Any
-        :param exception: Exception to set
-        :type exception: Exception
-        """
-        if not future or FutureHelper.done(future):
-            return
-
-        if hasattr(future, "set_exception"):
-            future.set_exception(exception)
diff --git a/dubbo/config/__init__.py b/dubbo/config/__init__.py
deleted file mode 100644
index 7ffd615..0000000
--- a/dubbo/config/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from .logger_config import FileLoggerConfig, LoggerConfig
-from .protocol_config import ProtocolConfig
-from .reference_config import ReferenceConfig
diff --git a/dubbo/config/logger_config.py b/dubbo/config/logger_config.py
deleted file mode 100644
index ecae584..0000000
--- a/dubbo/config/logger_config.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from dataclasses import dataclass
-from typing import Dict, Optional
-
-from dubbo.common.url import URL
-from dubbo.extension import extensionLoader
-from dubbo.logger import LoggerAdapter
-from dubbo.logger import constants as logger_constants
-from dubbo.logger import loggerFactory
-from dubbo.logger.constants import Level
-
-
-@dataclass
-class FileLoggerConfig:
-    """
-    File logger configuration.
-    :param rotate: File rotate type.
-    :type rotate: logger_constants.FileRotateType
-    :param file_formatter: File formatter.
-    :type file_formatter: Optional[str]
-    :param file_dir: File directory.
-    :type file_dir: str
-    :param file_name: File name.
-    :type file_name: str
-    :param backup_count: Backup count.
-    :type backup_count: int
-    :param max_bytes: Max bytes.
-    :type max_bytes: int
-    :param interval: Interval.
-    :type interval: int
-    """
-
-    rotate: logger_constants.FileRotateType = 
logger_constants.FileRotateType.NONE
-    file_formatter: Optional[str] = None
-    file_dir: str = logger_constants.DEFAULT_FILE_DIR_VALUE
-    file_name: str = logger_constants.DEFAULT_FILE_NAME_VALUE
-    backup_count: int = logger_constants.DEFAULT_FILE_BACKUP_COUNT_VALUE
-    max_bytes: int = logger_constants.DEFAULT_FILE_MAX_BYTES_VALUE
-    interval: int = logger_constants.DEFAULT_FILE_INTERVAL_VALUE
-
-    def check(self) -> None:
-        if self.rotate == logger_constants.FileRotateType.SIZE and 
self.max_bytes < 0:
-            raise ValueError("Max bytes can't be less than 0")
-        elif self.rotate == logger_constants.FileRotateType.TIME and 
self.interval < 1:
-            raise ValueError("Interval can't be less than 1")
-
-    def dict(self) -> Dict[str, str]:
-        return {
-            logger_constants.FILE_DIR_KEY: self.file_dir,
-            logger_constants.FILE_NAME_KEY: self.file_name,
-            logger_constants.FILE_ROTATE_KEY: self.rotate.value,
-            logger_constants.FILE_MAX_BYTES_KEY: str(self.max_bytes),
-            logger_constants.FILE_INTERVAL_KEY: str(self.interval),
-            logger_constants.FILE_BACKUP_COUNT_KEY: str(self.backup_count),
-        }
-
-
-class LoggerConfig:
-    """
-    Logger configuration.
-    """
-
-    __slots__ = [
-        "_driver",
-        "_level",
-        "_console_enabled",
-        "_console_config",
-        "_file_enabled",
-        "_file_config",
-    ]
-
-    def __init__(
-        self,
-        driver,
-        level: Level,
-        console_enabled: bool,
-        file_enabled: bool,
-        file_config: FileLoggerConfig,
-    ):
-        """
-        Initialize the logger configuration.
-        :param driver: The logger driver.
-        :type driver: str
-        :param level: The logger level.
-        :type level: Level
-        :param console_enabled: Whether to enable console logger.
-        :type console_enabled: bool
-        :param file_enabled: Whether to enable file logger.
-        :type file_enabled: bool
-        :param file_config: The file logger configuration.
-        :type file_config: FileLogger
-        """
-        # set global config
-        self._driver = driver
-        self._level = level
-        # set console config
-        self._console_enabled = console_enabled
-        # set file comfig
-        self._file_enabled = file_enabled
-        self._file_config = file_config
-        if file_enabled:
-            self._file_config.check()
-
-    def get_url(self) -> URL:
-        # get LoggerConfig parameters
-        parameters = {
-            logger_constants.DRIVER_KEY: self._driver,
-            logger_constants.LEVEL_KEY: self._level.value,
-            logger_constants.CONSOLE_ENABLED_KEY: str(self._console_enabled),
-            logger_constants.FILE_ENABLED_KEY: str(self._file_enabled),
-            **self._file_config.dict(),
-        }
-
-        return URL(scheme=self._driver, host=self._level.value, 
parameters=parameters)
-
-    def init(self):
-        # get logger_adapter and initialize loggerFactory
-        logger_adapter_class = extensionLoader.get_extension(
-            LoggerAdapter, self._driver
-        )
-        logger_adapter = logger_adapter_class(self.get_url())
-        loggerFactory.set_logger_adapter(logger_adapter)
-
-    @classmethod
-    def default_config(cls):
-        """
-        Get default logger configuration.
-        """
-        return LoggerConfig(
-            driver=logger_constants.DEFAULT_DRIVER_VALUE,
-            level=logger_constants.DEFAULT_LEVEL_VALUE,
-            console_enabled=logger_constants.DEFAULT_CONSOLE_ENABLED_VALUE,
-            file_enabled=logger_constants.DEFAULT_FILE_ENABLED_VALUE,
-            file_config=FileLoggerConfig(),
-        )
diff --git a/dubbo/config/protocol_config.py b/dubbo/config/protocol_config.py
deleted file mode 100644
index d629e1f..0000000
--- a/dubbo/config/protocol_config.py
+++ /dev/null
@@ -1,30 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class ProtocolConfig:
-
-    _name: str
-
-    __slots__ = ["_name"]
-
-    @property
-    def name(self) -> str:
-        return self._name
-
-    @name.setter
-    def name(self, value: str):
-        self._name = value
diff --git a/dubbo/config/reference_config.py b/dubbo/config/reference_config.py
deleted file mode 100644
index a7f258c..0000000
--- a/dubbo/config/reference_config.py
+++ /dev/null
@@ -1,62 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import threading
-from typing import Optional, Union
-
-from dubbo.common import URL, create_url
-from dubbo.extension import extensionLoader
-from dubbo.protocol import Invoker, Protocol
-
-
-class ReferenceConfig:
-
-    __slots__ = [
-        "_initialized",
-        "_global_lock",
-        "_service_name",
-        "_url",
-        "_protocol",
-        "_invoker",
-    ]
-
-    def __init__(self, url: Union[str, URL], service_name: str):
-        self._initialized = False
-        self._global_lock = threading.Lock()
-        self._url: URL = url if isinstance(url, URL) else create_url(url)
-        self._service_name = service_name
-        self._protocol: Optional[Protocol] = None
-        self._invoker: Optional[Invoker] = None
-
-    def get_invoker(self) -> Invoker:
-        if not self._invoker:
-            self._do_init()
-        return self._invoker
-
-    def _do_init(self):
-        with self._global_lock:
-            if self._initialized:
-                return
-            # Get the interface name from the URL path
-            self._url.path = self._service_name
-            self._protocol = extensionLoader.get_extension(Protocol, 
self._url.scheme)(
-                self._url
-            )
-            self._create_invoker()
-            self._initialized = True
-
-    def _create_invoker(self):
-        self._invoker = self._protocol.refer(self._url)
diff --git a/dubbo/config/service_config.py b/dubbo/config/service_config.py
deleted file mode 100644
index a4f3644..0000000
--- a/dubbo/config/service_config.py
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import Optional
-
-from dubbo.common import URL
-from dubbo.common import constants as common_constants
-from dubbo.extension import extensionLoader
-from dubbo.protocol import Protocol
-from dubbo.proxy.handlers import RpcServiceHandler
-
-__all__ = ["ServiceConfig"]
-
-
-class ServiceConfig:
-    """
-    Service configuration
-    """
-
-    def __init__(
-        self,
-        service_handler: RpcServiceHandler,
-        port: Optional[int] = None,
-        protocol: Optional[str] = None,
-    ):
-
-        self._service_handler = service_handler
-        self._port = port or common_constants.DEFAULT_PORT
-
-        protocol_str = protocol or common_constants.TRIPLE_SHORT
-
-        self._export_url = URL(
-            protocol_str, common_constants.LOCAL_HOST_KEY, self._port
-        )
-        self._export_url.attributes[common_constants.SERVICE_HANDLER_KEY] = (
-            service_handler
-        )
-
-        self._protocol: Protocol = extensionLoader.get_extension(
-            Protocol, protocol_str
-        )(self._export_url)
-
-        self._exported = False
-        self._exporting = False
-
-    def export(self):
-        """
-        Export service
-        """
-        if self._exporting or self._exported:
-            return
-
-        self._exporting = True
-        try:
-            self._protocol.export(self._export_url)
-            self._exported = True
-        finally:
-            self._exporting = False
diff --git a/dubbo/extension/registries.py b/dubbo/extension/registries.py
index 37c7bc7..f98974f 100644
--- a/dubbo/extension/registries.py
+++ b/dubbo/extension/registries.py
@@ -62,6 +62,7 @@ loadBalanceRegistry = ExtendedRegistry(
     interface=LoadBalance,
     impls={
         "random": "dubbo.cluster.loadbalances.RandomLoadBalance",
+        "cpu": "dubbo.cluster.loadbalances.CpuLoadBalance",
     },
 )
 
diff --git a/dubbo/logger/__init__.py b/dubbo/logger/__init__.py
deleted file mode 100644
index 4f42594..0000000
--- a/dubbo/logger/__init__.py
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._interfaces import Logger, LoggerAdapter
-from .logger_factory import LoggerFactory as _LoggerFactory
-
-# The logger factory instance.
-loggerFactory = _LoggerFactory()
-
-__all__ = ["Logger", "LoggerAdapter", "loggerFactory"]
diff --git a/dubbo/logger/_interfaces.py b/dubbo/logger/_interfaces.py
deleted file mode 100644
index 88fa999..0000000
--- a/dubbo/logger/_interfaces.py
+++ /dev/null
@@ -1,204 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import abc
-from typing import Any
-
-from dubbo.common.url import URL
-
-from .constants import Level
-
-_all__ = ["Logger", "LoggerAdapter"]
-
-
-class Logger(abc.ABC):
-    """
-    Logger Interface, which is used to log messages.
-    """
-
-    @abc.abstractmethod
-    def log(self, level: Level, msg: str, *args: Any, **kwargs: Any) -> None:
-        """
-        Log a message at the specified logging level.
-
-        :param level: The logging level.
-        :type level: Level
-        :param msg: The log message.
-        :type msg: str
-        :param args: Additional positional arguments.
-        :type args: Any
-        :param kwargs: Additional keyword arguments.
-        :type kwargs: Any
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def debug(self, msg: str, *args, **kwargs) -> None:
-        """
-        Log a debug message.
-
-        :param msg: The debug message.
-        :type msg: str
-        :param args: Additional positional arguments.
-        :type args: Any
-        :param kwargs: Additional keyword arguments.
-        :type kwargs: Any
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def info(self, msg: str, *args, **kwargs) -> None:
-        """
-        Log an info message.
-
-        :param msg: The info message.
-        :type msg: str
-        :param args: Additional positional arguments.
-        :type args: Any
-        :param kwargs: Additional keyword arguments.
-        :type kwargs: Any
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def warning(self, msg: str, *args, **kwargs) -> None:
-        """
-        Log a warning message.
-
-        :param msg: The warning message.
-        :type msg: str
-        :param args: Additional positional arguments.
-        :type args: Any
-        :param kwargs: Additional keyword arguments.
-        :type kwargs: Any
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def error(self, msg: str, *args, **kwargs) -> None:
-        """
-        Log an error message.
-
-        :param msg: The error message.
-        :type msg: str
-        :param args: Additional positional arguments.
-        :type args: Any
-        :param kwargs: Additional keyword arguments.
-        :type kwargs: Any
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def critical(self, msg: str, *args, **kwargs) -> None:
-        """
-        Log a critical message.
-
-        :param msg: The critical message.
-        :type msg: str
-        :param args: Additional positional arguments.
-        :type args: Any
-        :param kwargs: Additional keyword arguments.
-        :type kwargs: Any
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def fatal(self, msg: str, *args, **kwargs) -> None:
-        """
-        Log a fatal message.
-
-        :param msg: The fatal message.
-        :type msg: str
-        :param args: Additional positional arguments.
-        :type args: Any
-        :param kwargs: Additional keyword arguments.
-        :type kwargs: Any
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def exception(self, msg: str, *args, **kwargs) -> None:
-        """
-        Log an exception message.
-
-        :param msg: The exception message.
-        :type msg: str
-        :param args: Additional positional arguments.
-        :type args: Any
-        :param kwargs: Additional keyword arguments.
-        :type kwargs: Any
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def is_enabled_for(self, level: Level) -> bool:
-        """
-        Check if this logger is enabled for the specified level.
-
-        :param level: The logging level.
-        :type level: Level
-        :return: Whether the logging level is enabled.
-        :rtype: bool
-        """
-        raise NotImplementedError()
-
-
-class LoggerAdapter(abc.ABC):
-    """
-    Logger Adapter Interface, which is used to support different logging 
libraries.
-    """
-
-    __slots__ = ["_config"]
-
-    def __init__(self, config: URL):
-        """
-        Initialize the logger adapter.
-
-        :param config: The configuration of the logger adapter.
-        :type config: URL
-        """
-        self._config = config
-
-    def get_logger(self, name: str) -> Logger:
-        """
-        Get a logger by name.
-
-        :param name: The name of the logger.
-        :type name: str
-        :return: An instance of the logger.
-        :rtype: Logger
-        """
-        raise NotImplementedError()
-
-    @property
-    def level(self) -> Level:
-        """
-        Get the current logging level.
-
-        :return: The current logging level.
-        :rtype: Level
-        """
-        raise NotImplementedError()
-
-    @level.setter
-    def level(self, level: Level) -> None:
-        """
-        Set the logging level.
-
-        :param level: The logging level to set.
-        :type level: Level
-        """
-        raise NotImplementedError()
diff --git a/dubbo/logger/constants.py b/dubbo/logger/constants.py
deleted file mode 100644
index a6cae5d..0000000
--- a/dubbo/logger/constants.py
+++ /dev/null
@@ -1,127 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import enum
-import os
-
-__all__ = [
-    "Level",
-    "FileRotateType",
-    "LEVEL_KEY",
-    "DRIVER_KEY",
-    "CONSOLE_ENABLED_KEY",
-    "FILE_ENABLED_KEY",
-    "FILE_DIR_KEY",
-    "FILE_NAME_KEY",
-    "FILE_ROTATE_KEY",
-    "FILE_MAX_BYTES_KEY",
-    "FILE_INTERVAL_KEY",
-    "FILE_BACKUP_COUNT_KEY",
-    "DEFAULT_DRIVER_VALUE",
-    "DEFAULT_LEVEL_VALUE",
-    "DEFAULT_CONSOLE_ENABLED_VALUE",
-    "DEFAULT_FILE_ENABLED_VALUE",
-    "DEFAULT_FILE_DIR_VALUE",
-    "DEFAULT_FILE_NAME_VALUE",
-    "DEFAULT_FILE_MAX_BYTES_VALUE",
-    "DEFAULT_FILE_INTERVAL_VALUE",
-    "DEFAULT_FILE_BACKUP_COUNT_VALUE",
-]
-
-
[email protected]
-class Level(enum.Enum):
-    """
-    The logging level enum.
-
-    :cvar DEBUG: Debug level.
-    :cvar INFO: Info level.
-    :cvar WARNING: Warning level.
-    :cvar ERROR: Error level.
-    :cvar CRITICAL: Critical level.
-    :cvar FATAL: Fatal level.
-    :cvar UNKNOWN: Unknown level.
-    """
-
-    DEBUG = "DEBUG"
-    INFO = "INFO"
-    WARNING = "WARNING"
-    ERROR = "ERROR"
-    CRITICAL = "CRITICAL"
-    FATAL = "FATAL"
-    UNKNOWN = "UNKNOWN"
-
-    @classmethod
-    def get_level(cls, level_value: str) -> "Level":
-        """
-        Get the level from the level value.
-
-        :param level_value: The level value.
-        :type level_value: str
-        :return: The level. If the level value is invalid, return UNKNOWN.
-        :rtype: Level
-        """
-        level_value = level_value.upper()
-        for level in cls:
-            if level_value == level.value:
-                return level
-        return cls.UNKNOWN
-
-
[email protected]
-class FileRotateType(enum.Enum):
-    """
-    The file rotating type enum.
-
-    :cvar NONE: No rotating.
-    :cvar SIZE: Rotate the file by size.
-    :cvar TIME: Rotate the file by time.
-    """
-
-    NONE = "NONE"
-    SIZE = "SIZE"
-    TIME = "TIME"
-
-
-"""logger config keys"""
-# global config
-LEVEL_KEY = "logger.level"
-DRIVER_KEY = "logger.driver"
-
-# console config
-CONSOLE_ENABLED_KEY = "logger.console.enable"
-
-# file logger
-FILE_ENABLED_KEY = "logger.file.enable"
-FILE_DIR_KEY = "logger.file.dir"
-FILE_NAME_KEY = "logger.file.name"
-FILE_ROTATE_KEY = "logger.file.rotate"
-FILE_MAX_BYTES_KEY = "logger.file.maxbytes"
-FILE_INTERVAL_KEY = "logger.file.interval"
-FILE_BACKUP_COUNT_KEY = "logger.file.backupcount"
-
-"""some logger default value"""
-DEFAULT_DRIVER_VALUE = "logging"
-DEFAULT_LEVEL_VALUE = Level.DEBUG
-# console
-DEFAULT_CONSOLE_ENABLED_VALUE = True
-# file
-DEFAULT_FILE_ENABLED_VALUE = False
-DEFAULT_FILE_DIR_VALUE = os.path.expanduser("~")
-DEFAULT_FILE_NAME_VALUE = "dubbo.log"
-DEFAULT_FILE_MAX_BYTES_VALUE = 10 * 1024 * 1024
-DEFAULT_FILE_INTERVAL_VALUE = 1
-DEFAULT_FILE_BACKUP_COUNT_VALUE = 10
diff --git a/dubbo/logger/logger_factory.py b/dubbo/logger/logger_factory.py
deleted file mode 100644
index 0a7d0b2..0000000
--- a/dubbo/logger/logger_factory.py
+++ /dev/null
@@ -1,127 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import threading
-from typing import Dict, Optional
-
-from dubbo.common import SingletonBase
-from dubbo.common.url import URL
-from dubbo.logger import Logger, LoggerAdapter
-from dubbo.logger import constants as logger_constants
-from dubbo.logger.constants import Level
-
-__all__ = ["LoggerFactory"]
-
-# Default logger config with default values.
-_DEFAULT_CONFIG = URL(
-    scheme=logger_constants.DEFAULT_DRIVER_VALUE,
-    host=logger_constants.DEFAULT_LEVEL_VALUE.value,
-    parameters={
-        logger_constants.DRIVER_KEY: logger_constants.DEFAULT_DRIVER_VALUE,
-        logger_constants.LEVEL_KEY: logger_constants.DEFAULT_LEVEL_VALUE.value,
-        logger_constants.CONSOLE_ENABLED_KEY: str(
-            logger_constants.DEFAULT_CONSOLE_ENABLED_VALUE
-        ),
-        logger_constants.FILE_ENABLED_KEY: str(
-            logger_constants.DEFAULT_FILE_ENABLED_VALUE
-        ),
-    },
-)
-
-
-class LoggerFactory(SingletonBase):
-    """
-    Singleton factory class for creating and managing loggers.
-
-    This class ensures a single instance of the logger factory, provides 
methods to set and get
-    logger adapters, and manages logger instances.
-    """
-
-    def __init__(self):
-        """
-        Initialize the logger factory.
-
-        This method sets up the internal lock, logger adapter, and logger 
cache.
-        """
-        self._lock = threading.RLock()
-        self._logger_adapter: Optional[LoggerAdapter] = None
-        self._loggers: Dict[str, Logger] = {}
-
-    def _ensure_logger_adapter(self) -> None:
-        """
-        Ensure the logger adapter is set.
-
-        If the logger adapter is not set, this method sets it to the default 
adapter.
-        """
-        if not self._logger_adapter:
-            with self._lock:
-                if not self._logger_adapter:
-                    # Import here to avoid circular imports
-                    from dubbo.logger.logging.logger_adapter import 
LoggingLoggerAdapter
-
-                    
self.set_logger_adapter(LoggingLoggerAdapter(_DEFAULT_CONFIG))
-
-    def set_logger_adapter(self, logger_adapter: LoggerAdapter) -> None:
-        """
-        Set the logger adapter.
-
-        :param logger_adapter: The new logger adapter to use.
-        :type logger_adapter: LoggerAdapter
-        """
-        with self._lock:
-            self._logger_adapter = logger_adapter
-            # Update all loggers
-            self._loggers = {
-                name: self._logger_adapter.get_logger(name) for name in 
self._loggers
-            }
-
-    def get_logger_adapter(self) -> LoggerAdapter:
-        """
-        Get the current logger adapter.
-
-        :return: The current logger adapter.
-        :rtype: LoggerAdapter
-        """
-        self._ensure_logger_adapter()
-        return self._logger_adapter
-
-    def get_logger(self, name: str) -> Logger:
-        """
-        Get the logger by name.
-
-        :param name: The name of the logger to retrieve.
-        :type name: str
-        :return: An instance of the requested logger.
-        :rtype: Logger
-        """
-        self._ensure_logger_adapter()
-        logger = self._loggers.get(name)
-        if not logger:
-            with self._lock:
-                if name not in self._loggers:
-                    self._loggers[name] = self._logger_adapter.get_logger(name)
-                logger = self._loggers[name]
-        return logger
-
-    def get_level(self) -> Level:
-        """
-        Get the current logging level.
-
-        :return: The current logging level.
-        :rtype: Level
-        """
-        self._ensure_logger_adapter()
-        return self._logger_adapter.level
diff --git a/dubbo/logger/logging/formatter.py 
b/dubbo/logger/logging/formatter.py
deleted file mode 100644
index 1dc409e..0000000
--- a/dubbo/logger/logging/formatter.py
+++ /dev/null
@@ -1,89 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import re
-from enum import Enum
-
-__all__ = ["ColorFormatter", "NoColorFormatter", "Colors"]
-
-
-class Colors(Enum):
-    """
-    Colors for log messages.
-    """
-
-    END = "\033[0m"
-    BOLD = "\033[1m"
-    BLUE = "\033[34m"
-    GREEN = "\033[32m"
-    PURPLE = "\033[35m"
-    CYAN = "\033[36m"
-    RED = "\033[31m"
-    YELLOW = "\033[33m"
-    GREY = "\033[38;5;240m"
-
-
-LEVEL_MAP = {
-    "DEBUG": Colors.BLUE.value,
-    "INFO": Colors.GREEN.value,
-    "WARNING": Colors.YELLOW.value,
-    "ERROR": Colors.RED.value,
-    "CRITICAL": Colors.RED.value + Colors.BOLD.value,
-}
-
-DATE_FORMAT: str = "%Y-%m-%d %H:%M:%S"
-
-LOG_FORMAT: str = (
-    f"{Colors.GREEN.value}%(asctime)s{Colors.END.value}"
-    " | "
-    f"%(level_color)s%(levelname)s{Colors.END.value}"
-    " | "
-    f"{Colors.CYAN.value}%(module)s:%(funcName)s:%(lineno)d{Colors.END.value}"
-    " - "
-    f"{Colors.PURPLE.value}[Dubbo]{Colors.END.value} "
-    f"%(msg_color)s%(message)s{Colors.END.value}"
-)
-
-
-class ColorFormatter(logging.Formatter):
-    """
-    A formatter with color.
-    It will format the log message like this:
-    2024-06-24 16:39:57 | DEBUG | test_logger_factory:test_with_config:44 - 
[Dubbo] debug log
-    """
-
-    def __init__(self):
-        self.log_format = LOG_FORMAT
-        super().__init__(self.log_format, DATE_FORMAT)
-
-    def format(self, record) -> str:
-        levelname = record.levelname
-        record.level_color = record.msg_color = LEVEL_MAP.get(levelname)
-        return super().format(record)
-
-
-class NoColorFormatter(logging.Formatter):
-    """
-    A formatter without color.
-    It will format the log message like this:
-    2024-06-24 16:39:57 | DEBUG | test_logger_factory:test_with_config:44 - 
[Dubbo] debug log
-    """
-
-    def __init__(self):
-        color_re = re.compile(r"\033\[[0-9;]*\w|%\((msg_color|level_color)\)s")
-        self.log_format = color_re.sub("", LOG_FORMAT)
-        super().__init__(self.log_format, DATE_FORMAT)
diff --git a/dubbo/logger/logging/logger.py b/dubbo/logger/logging/logger.py
deleted file mode 100644
index d8feb77..0000000
--- a/dubbo/logger/logging/logger.py
+++ /dev/null
@@ -1,94 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from typing import Dict
-
-from dubbo.logger import Logger
-
-from ..constants import Level
-
-__all__ = ["LoggingLogger"]
-
-# The mapping from the logging level to the logging level.
-LEVEL_MAP: Dict[Level, int] = {
-    Level.DEBUG: logging.DEBUG,
-    Level.INFO: logging.INFO,
-    Level.WARNING: logging.WARNING,
-    Level.ERROR: logging.ERROR,
-    Level.CRITICAL: logging.CRITICAL,
-    Level.FATAL: logging.FATAL,
-}
-
-STACKLEVEL_KEY = "stacklevel"
-STACKLEVEL_DEFAULT = 1
-STACKLEVEL_OFFSET = 2
-
-EXC_INFO_KEY = "exc_info"
-EXC_INFO_DEFAULT = True
-
-
-class LoggingLogger(Logger):
-    """
-    The logging logger implementation.
-    """
-
-    __slots__ = ["_logger"]
-
-    def __init__(self, internal_logger: logging.Logger):
-        """
-        Initialize the logger.
-        :param internal_logger: The internal logger.
-        :type internal_logger: logging
-        """
-        self._logger = internal_logger
-
-    def _log(self, level: int, msg: str, *args, **kwargs) -> None:
-        # Add the stacklevel to the keyword arguments.
-        kwargs[STACKLEVEL_KEY] = (
-            kwargs.get(STACKLEVEL_KEY, STACKLEVEL_DEFAULT) + STACKLEVEL_OFFSET
-        )
-        self._logger.log(level, msg, *args, **kwargs)
-
-    def log(self, level: Level, msg: str, *args, **kwargs) -> None:
-        self._log(LEVEL_MAP[level], msg, *args, **kwargs)
-
-    def debug(self, msg: str, *args, **kwargs) -> None:
-        self._log(logging.DEBUG, msg, *args, **kwargs)
-
-    def info(self, msg: str, *args, **kwargs) -> None:
-        self._log(logging.INFO, msg, *args, **kwargs)
-
-    def warning(self, msg: str, *args, **kwargs) -> None:
-        self._log(logging.WARNING, msg, *args, **kwargs)
-
-    def error(self, msg: str, *args, **kwargs) -> None:
-        self._log(logging.ERROR, msg, *args, **kwargs)
-
-    def critical(self, msg: str, *args, **kwargs) -> None:
-        self._log(logging.CRITICAL, msg, *args, **kwargs)
-
-    def fatal(self, msg: str, *args, **kwargs) -> None:
-        self._log(logging.FATAL, msg, *args, **kwargs)
-
-    def exception(self, msg: str, *args, **kwargs) -> None:
-        if kwargs.get(EXC_INFO_KEY) is None:
-            kwargs[EXC_INFO_KEY] = EXC_INFO_DEFAULT
-        self.error(msg, *args, **kwargs)
-
-    def is_enabled_for(self, level: Level) -> bool:
-        logging_level = LEVEL_MAP.get(level)
-        return self._logger.isEnabledFor(logging_level) if logging_level else 
False
diff --git a/dubbo/logger/logging/logger_adapter.py 
b/dubbo/logger/logging/logger_adapter.py
deleted file mode 100644
index 3e60813..0000000
--- a/dubbo/logger/logging/logger_adapter.py
+++ /dev/null
@@ -1,186 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import os
-import sys
-from functools import cache
-from logging import handlers
-
-from dubbo.common import constants as common_constants
-from dubbo.common.url import URL
-from dubbo.logger import Logger, LoggerAdapter
-from dubbo.logger import constants as logger_constants
-from dubbo.logger.constants import LEVEL_KEY, Level
-from dubbo.logger.logging import formatter
-from dubbo.logger.logging.logger import LoggingLogger
-
-"""This module provides the logging logger implementation. -> logging module"""
-
-__all__ = ["LoggingLoggerAdapter"]
-
-
-class LoggingLoggerAdapter(LoggerAdapter):
-    """
-    Internal logger adapter responsible for creating loggers and encapsulating 
the logging.getLogger() method.
-    """
-
-    __slots__ = ["_level"]
-
-    def __init__(self, config: URL):
-        """
-        Initialize the LoggingLoggerAdapter with the given configuration.
-
-        :param config: The configuration URL for the logger adapter.
-        :type config: URL
-        """
-        super().__init__(config)
-        # Set level
-        level_name = config.parameters.get(LEVEL_KEY)
-        self._level = Level.get_level(level_name) if level_name else 
Level.DEBUG
-        self._update_level()
-
-    def get_logger(self, name: str) -> Logger:
-        """
-        Create a logger instance by name.
-
-        :param name: The logger name.
-        :type name: str
-        :return: An instance of the logger.
-        :rtype: Logger
-        """
-        logger_instance = logging.getLogger(name)
-        # clean up handlers
-        logger_instance.handlers.clear()
-
-        # Add console handler
-        console_enabled = self._config.parameters.get(
-            logger_constants.CONSOLE_ENABLED_KEY,
-            str(logger_constants.DEFAULT_CONSOLE_ENABLED_VALUE),
-        )
-        if console_enabled.lower() == common_constants.TRUE_VALUE or bool(
-            sys.stdout and sys.stdout.isatty()
-        ):
-            logger_instance.addHandler(self._get_console_handler())
-
-        # Add file handler
-        file_enabled = self._config.parameters.get(
-            logger_constants.FILE_ENABLED_KEY,
-            str(logger_constants.DEFAULT_FILE_ENABLED_VALUE),
-        )
-        if file_enabled.lower() == common_constants.TRUE_VALUE:
-            logger_instance.addHandler(self._get_file_handler())
-
-        if not logger_instance.handlers:
-            # It's intended to be used to avoid the "No handlers could be 
found for logger XXX" one-off warning.
-            logger_instance.addHandler(logging.NullHandler())
-
-        return LoggingLogger(logger_instance)
-
-    @cache
-    def _get_console_handler(self) -> logging.StreamHandler:
-        """
-        Get the console handler, avoiding duplicate creation with caching.
-
-        :return: The console handler.
-        :rtype: logging.StreamHandler
-        """
-        console_handler = logging.StreamHandler()
-        console_handler.setFormatter(formatter.ColorFormatter())
-
-        return console_handler
-
-    @cache
-    def _get_file_handler(self) -> logging.Handler:
-        """
-        Get the file handler, avoiding duplicate creation with caching.
-
-        :return: The file handler.
-        :rtype: logging.Handler
-        """
-        # Get file path
-        file_dir = self._config.parameters.get(logger_constants.FILE_DIR_KEY)
-        file_name = self._config.parameters.get(
-            logger_constants.FILE_NAME_KEY, 
logger_constants.DEFAULT_FILE_NAME_VALUE
-        )
-        file_path = os.path.join(file_dir, file_name)
-        # Get backup count
-        backup_count = int(
-            self._config.parameters.get(
-                logger_constants.FILE_BACKUP_COUNT_KEY,
-                logger_constants.DEFAULT_FILE_BACKUP_COUNT_VALUE,
-            )
-        )
-        # Get rotate type
-        rotate_type = 
self._config.parameters.get(logger_constants.FILE_ROTATE_KEY)
-
-        # Set file Handler
-        file_handler: logging.Handler
-        if rotate_type == logger_constants.FileRotateType.SIZE.value:
-            # Set RotatingFileHandler
-            max_bytes = int(
-                
self._config.parameters.get(logger_constants.FILE_MAX_BYTES_KEY)
-            )
-            file_handler = handlers.RotatingFileHandler(
-                file_path, maxBytes=max_bytes, backupCount=backup_count
-            )
-        elif rotate_type == logger_constants.FileRotateType.TIME.value:
-            # Set TimedRotatingFileHandler
-            interval = int(
-                self._config.parameters.get(logger_constants.FILE_INTERVAL_KEY)
-            )
-            file_handler = handlers.TimedRotatingFileHandler(
-                file_path, interval=interval, backupCount=backup_count
-            )
-        else:
-            # Set FileHandler
-            file_handler = logging.FileHandler(file_path)
-
-        # Add file_handler
-        file_handler.setFormatter(formatter.NoColorFormatter())
-        return file_handler
-
-    @property
-    def level(self) -> Level:
-        """
-        Get the logging level.
-
-        :return: The current logging level.
-        :rtype: Level
-        """
-        return self._level
-
-    @level.setter
-    def level(self, level: Level) -> None:
-        """
-        Set the logging level.
-
-        :param level: The logging level to set.
-        :type level: Level
-        """
-        if level == self._level or level is None:
-            return
-        self._level = level
-        self._update_level()
-
-    def _update_level(self):
-        """
-        Update the log level by modifying the root logger.
-        """
-        # Get the root logger
-        root_logger = logging.getLogger()
-        # Set the logging level
-        root_logger.setLevel(self._level.value)
diff --git a/dubbo/protocol/triple/invoker.py b/dubbo/protocol/triple/invoker.py
index 95c6147..de93e94 100644
--- a/dubbo/protocol/triple/invoker.py
+++ b/dubbo/protocol/triple/invoker.py
@@ -47,7 +47,7 @@ class TripleInvoker(Invoker):
     Triple invoker.
     """
 
-    __slots__ = ["_url", "_client", "_stream_multiplexer", "_compression", 
"_destroyed"]
+    __slots__ = ["_url", "_client", "_stream_multiplexer", "_compression"]
 
     def __init__(
         self, url: URL, client: Client, stream_multiplexer: 
StreamClientMultiplexHandler
@@ -56,8 +56,6 @@ class TripleInvoker(Invoker):
         self._client = client
         self._stream_multiplexer = stream_multiplexer
 
-        self._destroyed = False
-
     def invoke(self, invocation: RpcInvocation) -> Result:
         call_type: CallType = 
invocation.get_attribute(common_constants.CALL_KEY)
         result = TriResult(call_type)
@@ -202,10 +200,6 @@ class TripleInvoker(Invoker):
     def is_available(self) -> bool:
         return self._client.is_connected()
 
-    @property
-    def destroyed(self) -> bool:
-        return self._destroyed
-
     def destroy(self) -> None:
         self._client.close()
         self._client = None
diff --git a/dubbo/protocol/triple/protocol.py 
b/dubbo/protocol/triple/protocol.py
index 102b552..b9896c2 100644
--- a/dubbo/protocol/triple/protocol.py
+++ b/dubbo/protocol/triple/protocol.py
@@ -61,11 +61,13 @@ class TripleProtocol(Protocol):
         if self._server is not None:
             return
 
-        service_handler: RpcServiceHandler = url.attributes[
-            common_constants.SERVICE_HANDLER_KEY
-        ]
+        service_handler = url.attributes[common_constants.SERVICE_HANDLER_KEY]
 
-        self._path_resolver[service_handler.service_name] = service_handler
+        if iter(service_handler):
+            for handler in service_handler:
+                self._path_resolver[handler.service_name] = handler
+        else:
+            self._path_resolver[service_handler.service_name] = service_handler
 
         method_executor = ThreadPoolExecutor(
             thread_name_prefix=f"dubbo_tri_method_{str(uuid.uuid4())}", 
max_workers=10
diff --git a/dubbo/registry/protocol.py b/dubbo/registry/protocol.py
index 2a13764..ebee40f 100644
--- a/dubbo/registry/protocol.py
+++ b/dubbo/registry/protocol.py
@@ -14,9 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from dubbo.cluster import Directory
 from dubbo.cluster.directories import RegistryDirectory
 from dubbo.cluster.failfast_cluster import FailfastCluster
+from dubbo.cluster.monitor.cpu import CpuMonitor, CpuInnerRpcHandler
 from dubbo.configs import RegistryConfig
 from dubbo.constants import common_constants
 from dubbo.extension import extensionLoader
@@ -44,8 +44,15 @@ class RegistryProtocol(Protocol):
         # get the server registry
         registry = self._factory.get_registry(url)
 
-        ref_url = url.attributes[common_constants.EXPORT_KEY]
+        ref_url: URL = url.attributes[common_constants.EXPORT_KEY]
         registry.register(ref_url)
+
+        # add cpu handler
+        ref_url.attributes[common_constants.SERVICE_HANDLER_KEY] = [
+            ref_url.attributes[common_constants.SERVICE_HANDLER_KEY],
+            CpuInnerRpcHandler.get_service_handler(),
+        ]
+
         # continue the export process
         self._protocol.export(ref_url)
 
@@ -53,7 +60,10 @@ class RegistryProtocol(Protocol):
         registry = self._factory.get_registry(url)
 
         # create the directory
-        directory: Directory = RegistryDirectory(registry, self._protocol, url)
+        if url.parameters.get(common_constants.LOADBALANCE_KEY) == "cpu":
+            directory = CpuMonitor(registry, self._protocol, url)
+        else:
+            directory = RegistryDirectory(registry, self._protocol, url)
 
         # continue the refer process
         return FailfastCluster().join(directory)
diff --git a/dubbo/remoting/aio/http2/controllers.py 
b/dubbo/remoting/aio/http2/controllers.py
index 6642ecf..d52e2df 100644
--- a/dubbo/remoting/aio/http2/controllers.py
+++ b/dubbo/remoting/aio/http2/controllers.py
@@ -349,6 +349,12 @@ class FrameOutboundController(Controller):
         """
 
         def _inner_operation(_frame: UserActionFrames):
+
+            # -1 means the stream is not created, so we don't need to send the 
reset frame
+            if self._stream.id == -1:
+                return
+
+            _frame.stream_id = self._stream.id
             self._protocol.send_frame(_frame, self._stream)
 
             self._stream.close_local()
@@ -376,6 +382,7 @@ class FrameOutboundController(Controller):
         # wait and send the data frames
         while True:
             frame = await self._data_queue.get()
+            frame.stream_id = self._stream.id
             if frame is not FrameOutboundController.LAST_DATA_FRAME:
                 self._data_sent_event = asyncio.Event()
                 self._protocol.send_frame(frame, self._stream, 
self._data_sent_event)
@@ -388,6 +395,7 @@ class FrameOutboundController(Controller):
 
         # wait for the last data frame and send the trailers frame
         await self._data_sent_event.wait()
+        self._trailers.stream_id = self._stream.id
         self._protocol.send_frame(self._trailers, self._stream)
 
         # close the stream
diff --git a/dubbo/remoting/aio/http2/frames.py 
b/dubbo/remoting/aio/http2/frames.py
index 8809f8d..b216b4d 100644
--- a/dubbo/remoting/aio/http2/frames.py
+++ b/dubbo/remoting/aio/http2/frames.py
@@ -24,7 +24,7 @@ __all__ = [
     "HeadersFrame",
     "DataFrame",
     "WindowUpdateFrame",
-    "ResetStreamFrame",
+    "RstStreamFrame",
     "PingFrame",
     "UserActionFrames",
 ]
@@ -147,7 +147,7 @@ class WindowUpdateFrame(Http2Frame):
         return f"<WindowUpdateFrame stream_id={self.stream_id} 
delta={self.delta}>"
 
 
-class ResetStreamFrame(Http2Frame):
+class RstStreamFrame(Http2Frame):
     """
     HTTP/2 reset stream frame.
     """
@@ -170,7 +170,9 @@ class ResetStreamFrame(Http2Frame):
         self.error_code = error_code
 
     def __repr__(self) -> str:
-        return f"<ResetStreamFrame stream_id={self.stream_id} 
error_code={self.error_code}>"
+        return (
+            f"<RstStreamFrame stream_id={self.stream_id} 
error_code={self.error_code}>"
+        )
 
 
 class PingFrame(Http2Frame):
@@ -178,9 +180,9 @@ class PingFrame(Http2Frame):
     HTTP/2 ping frame.
     """
 
-    __slots__ = ["data"]
+    __slots__ = ["data", "ack"]
 
-    def __init__(self, data: bytes):
+    def __init__(self, data: bytes, ack: bool = False):
         """
         Initialize the HTTP/2 ping frame.
         :param data: The data.
@@ -188,10 +190,11 @@ class PingFrame(Http2Frame):
         """
         super().__init__(0, Http2FrameType.PING, False)
         self.data = data
+        self.ack = ack
 
     def __repr__(self) -> str:
         return f"<PingFrame data={self.data}>"
 
 
 # User action frames.
-UserActionFrames = Union[HeadersFrame, DataFrame, ResetStreamFrame]
+UserActionFrames = Union[HeadersFrame, DataFrame, RstStreamFrame]
diff --git a/dubbo/remoting/aio/http2/protocol.py 
b/dubbo/remoting/aio/http2/protocol.py
index fa96523..89ff26f 100644
--- a/dubbo/remoting/aio/http2/protocol.py
+++ b/dubbo/remoting/aio/http2/protocol.py
@@ -17,7 +17,7 @@ import abc
 import asyncio
 import struct
 import time
-from typing import List, Optional, Tuple
+from typing import Optional
 
 from h2.config import H2Configuration
 from h2.connection import H2Connection
@@ -32,7 +32,7 @@ from dubbo.remoting.aio.http2.frames import (
     HeadersFrame,
     Http2Frame,
     PingFrame,
-    ResetStreamFrame,
+    RstStreamFrame,
     UserActionFrames,
     WindowUpdateFrame,
 )
@@ -159,9 +159,7 @@ class AbstractHttp2Protocol(asyncio.Protocol, abc.ABC):
         """
         frame_type = frame.frame_type
         if frame_type == Http2FrameType.HEADERS:
-            self._send_headers_frame(
-                frame.stream_id, frame.headers.to_list(), frame.end_stream, 
event
-            )
+            self._send_headers_frame(frame, stream, event)
         elif frame_type == Http2FrameType.DATA:
             self._flow_controller.write_data(stream, frame, event)
         elif frame_type == Http2FrameType.RST_STREAM:
@@ -171,22 +169,25 @@ class AbstractHttp2Protocol(asyncio.Protocol, abc.ABC):
 
     def _send_headers_frame(
         self,
-        stream_id: int,
-        headers: List[Tuple[str, str]],
-        end_stream: bool,
+        frame: HeadersFrame,
+        stream: Http2Stream,
         event: Optional[asyncio.Event] = None,
     ) -> None:
         """
         Send the HTTP/2 headers frame.(thread-unsafe)
-        :param stream_id: The stream identifier.
-        :type stream_id: int
-        :param headers: The headers.
-        :type headers: List[Tuple[str, str]]
-        :param end_stream: Whether the stream is ended.
-        :type end_stream: bool
+         :param frame: The frame to send.
+        :type frame: HeadersFrame
+        :param stream: The stream.
+        :type stream: Http2Stream
         :param event: The event to be set after sending the frame.
         """
-        self._h2_connection.send_headers(stream_id, headers, 
end_stream=end_stream)
+        if stream.id == -1:
+            stream.id = self._h2_connection.get_next_available_stream_id()
+            self._stream_handler.put_stream(stream.id, stream)
+
+        self._h2_connection.send_headers(
+            stream.id, frame.headers.to_list(), end_stream=frame.end_stream
+        )
         self._flush()
         EventHelper.set(event)
 
@@ -248,7 +249,7 @@ class AbstractHttp2Protocol(asyncio.Protocol, abc.ABC):
                     if isinstance(frame, WindowUpdateFrame):
                         # Because flow control may be at the connection level, 
it is handled here
                         self._flow_controller.release_flow_control(frame)
-                    elif isinstance(frame, (HeadersFrame, DataFrame, 
ResetStreamFrame)):
+                    elif isinstance(frame, (HeadersFrame, DataFrame, 
RstStreamFrame)):
                         # Handle the frame by the stream handler
                         self._stream_handler.handle_frame(frame)
                     else:
@@ -331,7 +332,7 @@ class Http2ClientProtocol(AbstractHttp2Protocol):
 
     def _do_other_frame(self, frame: Http2Frame):
         # Handle the ping frame
-        if isinstance(frame, PingFrame):
+        if isinstance(frame, PingFrame) and frame.ack:
             FutureHelper.set_result(self._ping_ack_future, None)
 
     async def _heartbeat_loop(self):
diff --git a/dubbo/remoting/aio/http2/stream.py 
b/dubbo/remoting/aio/http2/stream.py
index e610d7c..c072a95 100644
--- a/dubbo/remoting/aio/http2/stream.py
+++ b/dubbo/remoting/aio/http2/stream.py
@@ -23,7 +23,7 @@ from dubbo.remoting.aio.exceptions import StreamError
 from dubbo.remoting.aio.http2.frames import (
     DataFrame,
     HeadersFrame,
-    ResetStreamFrame,
+    RstStreamFrame,
     UserActionFrames,
 )
 from dubbo.remoting.aio.http2.headers import Http2Headers
@@ -58,6 +58,13 @@ class Http2Stream(abc.ABC):
         """
         return self._id
 
+    @id.setter
+    def id(self, stream_id: int) -> None:
+        """
+        Set the stream identifier.
+        """
+        self._id = stream_id
+
     @property
     def listener(self) -> "Http2Stream.Listener":
         """
@@ -261,8 +268,8 @@ class DefaultHttp2Stream(Http2Stream):
         if self.local_closed:
             # The stream has been closed locally.
             return
-        reset_frame = ResetStreamFrame(self.id, error_code)
-        self._outbound_controller.write_rst(reset_frame)
+        rst_frame = RstStreamFrame(self.id, error_code)
+        self._outbound_controller.write_rst(rst_frame)
 
     def receive_frame(self, frame: UserActionFrames) -> None:
         """
diff --git a/dubbo/remoting/aio/http2/stream_handler.py 
b/dubbo/remoting/aio/http2/stream_handler.py
index 65ec7bd..73d2d4f 100644
--- a/dubbo/remoting/aio/http2/stream_handler.py
+++ b/dubbo/remoting/aio/http2/stream_handler.py
@@ -16,12 +16,10 @@
 
 import asyncio
 import uuid
-from concurrent import futures
 from concurrent.futures import ThreadPoolExecutor
 from typing import Callable, Dict, Optional
 
 from dubbo.loggers import loggerFactory
-from dubbo.remoting.aio.exceptions import ProtocolError
 from dubbo.remoting.aio.http2.frames import UserActionFrames
 from dubbo.remoting.aio.http2.registries import Http2FrameType
 from dubbo.remoting.aio.http2.stream import DefaultHttp2Stream, Http2Stream
@@ -107,6 +105,9 @@ class StreamMultiplexHandler:
             # It must be ensured that the event loop is not blocked,
             # and if there is a blocking operation, the executor must be used.
             stream.receive_frame(frame)
+
+            if frame.end_stream and stream.local_closed:
+                self.remove_stream(frame.stream_id)
         else:
             _LOGGER.warning(
                 f"Stream {frame.stream_id} not found. Ignoring frame {frame}"
@@ -134,19 +135,9 @@ class StreamClientMultiplexHandler(StreamMultiplexHandler):
         :return: The stream.
         :rtype: DefaultHttp2Stream
         """
-        future = futures.Future()
-        self._protocol.get_next_stream_id(future)
-        try:
-            # block until the stream_id is created
-            stream_id = future.result()
-            new_stream = DefaultHttp2Stream(
-                stream_id, listener, self._loop, self._protocol, self._executor
-            )
-            self.put_stream(stream_id, new_stream)
-        except Exception as e:
-            raise ProtocolError("Failed to create stream.") from e
-
-        return new_stream
+        return DefaultHttp2Stream(
+            -1, listener, self._loop, self._protocol, self._executor
+        )
 
 
 class StreamServerMultiplexHandler(StreamMultiplexHandler):
diff --git a/dubbo/remoting/aio/http2/utils.py 
b/dubbo/remoting/aio/http2/utils.py
index 7cc4f66..32d52f0 100644
--- a/dubbo/remoting/aio/http2/utils.py
+++ b/dubbo/remoting/aio/http2/utils.py
@@ -22,7 +22,7 @@ from dubbo.remoting.aio.http2.frames import (
     DataFrame,
     HeadersFrame,
     PingFrame,
-    ResetStreamFrame,
+    RstStreamFrame,
     WindowUpdateFrame,
 )
 from dubbo.remoting.aio.http2.headers import Http2Headers
@@ -40,14 +40,14 @@ class Http2EventUtils:
     def convert_to_frame(
         event: h2_event.Event,
     ) -> Union[
-        HeadersFrame, DataFrame, ResetStreamFrame, WindowUpdateFrame, 
PingFrame, None
+        HeadersFrame, DataFrame, RstStreamFrame, WindowUpdateFrame, PingFrame, 
None
     ]:
         """
         Convert a h2.events.Event to HTTP/2 Frame.
         :param event: The H2 event.
         :type event: h2.events.Event
         :return: The HTTP/2 frame.
-        :rtype: Union[HeadersFrame, DataFrame, ResetStreamFrame, 
WindowUpdateFrame, PingFrame, None]
+        :rtype: Union[HeadersFrame, DataFrame, RstStreamFrame, 
WindowUpdateFrame, PingFrame, None]
         """
         if isinstance(
             event,
@@ -73,14 +73,14 @@ class Http2EventUtils:
             )
         elif isinstance(event, h2_event.StreamReset):
             # RST_STREAM frame.
-            return ResetStreamFrame(
-                event.stream_id, Http2ErrorCode.get(event.error_code)
-            )
+            return RstStreamFrame(event.stream_id, 
Http2ErrorCode.get(event.error_code))
         elif isinstance(event, h2_event.WindowUpdated):
             # WINDOW_UPDATE frame.
             return WindowUpdateFrame(event.stream_id, event.delta)
-        elif isinstance(event, h2_event.PingReceived):
+        elif isinstance(event, (h2_event.PingAckReceived, 
h2_event.PingReceived)):
             # PING frame.
-            return PingFrame(event.ping_data)
+            return PingFrame(
+                event.ping_data, ack=isinstance(event, 
h2_event.PingAckReceived)
+            )
 
         return None
diff --git a/dubbo/utils.py b/dubbo/utils.py
index 47b404f..6900237 100644
--- a/dubbo/utils.py
+++ b/dubbo/utils.py
@@ -16,7 +16,11 @@
 
 import socket
 
-__all__ = ["EventHelper", "FutureHelper", "NetworkUtils"]
+__all__ = ["EventHelper", "FutureHelper", "NetworkUtils", "CpuUtils"]
+
+from typing import List, Tuple
+
+import psutil
 
 
 class EventHelper:
@@ -155,3 +159,71 @@ class NetworkUtils:
         :rtype: str
         """
         return socket.gethostbyname(NetworkUtils.get_host_name())
+
+
+class CpuUtils:
+    """
+    Helper class for CPU operations.
+    """
+
+    @staticmethod
+    def get_cpu_count(logical=True) -> int:
+        """
+        Get the number of CPUs in the system.
+
+        :return: The number of CPUs in the system.
+        :rtype: int
+        """
+        return psutil.cpu_count(logical=logical)
+
+    @staticmethod
+    def get_total_cpu_usage(interval=1) -> float:
+        """
+        Get the total CPU usage of the system.
+
+        :param interval: The interval in seconds.
+        :type interval: int
+        :return: The total CPU usage of the system.
+        :rtype: float
+        """
+        return psutil.cpu_percent(interval=interval)
+
+    @staticmethod
+    def get_per_cpu_usage(interval=1) -> List[float]:
+        """
+        Get the per CPU usage of the system.
+
+        :param interval: The interval in seconds.
+        :type interval: int
+        :return: The per CPU usage of the system.
+        :rtype: list
+        """
+        return psutil.cpu_percent(interval=interval, percpu=True)
+
+    @staticmethod
+    def get_load_avg() -> Tuple[float, float, float]:
+        """
+        Get the load average over the last 1, 5, and 15 minutes
+
+        :return: The load average of the system.
+        :rtype: list
+        """
+        return psutil.getloadavg()
+
+    @staticmethod
+    def get_cpu_stats():
+        """
+        Get the CPU stats of the system.
+
+        :return: The CPU stats of the system.
+        """
+        return psutil.cpu_stats()
+
+    @staticmethod
+    def get_cpu_freq():
+        """
+        Get the current CPU frequency.
+
+        :return: The current CPU frequency.
+        """
+        return psutil.cpu_freq()
diff --git a/requirements.txt b/requirements.txt
index dd0cffb..89dbdc6 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,4 @@
 h2>=4.1.0
 uvloop>=0.19.0
-kazoo>=2.10.0
\ No newline at end of file
+kazoo>=2.10.0
+psutil>=6.0.0
\ No newline at end of file
diff --git a/samples/registry/zookeeper/client.py 
b/samples/registry/zookeeper/client.py
index 9c84db0..ff81ba7 100644
--- a/samples/registry/zookeeper/client.py
+++ b/samples/registry/zookeeper/client.py
@@ -13,6 +13,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import time
+
 import unary_unary_pb2
 
 import dubbo
@@ -33,10 +35,14 @@ class UnaryServiceStub:
 
 
 if __name__ == "__main__":
-    registry_config = RegistryConfig.from_url("zookeeper://127.0.0.1:2181")
+    registry_config = RegistryConfig.from_url(
+        "zookeeper://127.0.0.1:2181"
+    )
     bootstrap = dubbo.Dubbo(registry_config=registry_config)
 
-    reference_config = ReferenceConfig(protocol="tri", 
service="org.apache.dubbo.samples.registry.zk")
+    reference_config = ReferenceConfig(
+        protocol="tri", service="org.apache.dubbo.samples.registry.zk"
+    )
     dubbo_client = bootstrap.create_client(reference_config)
 
     unary_service_stub = UnaryServiceStub(dubbo_client)
diff --git a/samples/stream/server_stream/client.py 
b/samples/stream/server_stream/client.py
index fa9d4c1..4b9cb39 100644
--- a/samples/stream/server_stream/client.py
+++ b/samples/stream/server_stream/client.py
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import unary_stream_pb2
-from setuptools.extern import names
 
 import dubbo
 from dubbo.configs import ReferenceConfig
diff --git a/setup.py b/setup.py
index edb5703..4b91932 100644
--- a/setup.py
+++ b/setup.py
@@ -56,6 +56,10 @@ setup(
     packages=find_packages(include=("dubbo", "dubbo.*")),
     test_suite="tests",
     python_requires=">=3.11",
-    install_requires=["h2>=4.1.0", "uvloop>=0.19.0; 
platform_system!='Windows'"],
+    install_requires=[
+        "h2>=4.1.0",
+        "uvloop>=0.19.0; platform_system!='Windows'",
+        "psutil>=6.0.0",
+    ],
     extras_require={"zookeeper": ["kazoo>=2.10.0"]},
 )

Reply via email to