This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-python.git
The following commit(s) were added to refs/heads/main by this push:
new 9bab773 [Feature] CPU load balancing implementation and code
structure optimization (#38)
9bab773 is described below
commit 9bab773b7c5caabf6cf147965298f621246daaa0
Author: Zaki <[email protected]>
AuthorDate: Mon Aug 26 09:57:25 2024 +0800
[Feature] CPU load balancing implementation and code structure optimization
(#38)
* feat&fix: Optimize the logic of h2, remove duplicate code, add cpu
monitoring
* fix: Optimize some code
---
dubbo/__version__.py | 2 +-
dubbo/cluster/failfast_cluster.py | 6 +-
dubbo/cluster/loadbalances.py | 44 +++
.../logging => cluster/monitor}/__init__.py | 4 -
dubbo/cluster/monitor/cpu.py | 176 +++++++++++
dubbo/common/__init__.py | 32 --
dubbo/common/classes.py | 41 ---
dubbo/common/constants.py | 62 ----
dubbo/common/deliverers.py | 314 --------------------
dubbo/common/node.py | 58 ----
dubbo/common/types.py | 22 --
dubbo/common/url.py | 325 ---------------------
dubbo/common/utils.py | 129 --------
dubbo/config/__init__.py | 19 --
dubbo/config/logger_config.py | 150 ----------
dubbo/config/protocol_config.py | 30 --
dubbo/config/reference_config.py | 62 ----
dubbo/config/service_config.py | 71 -----
dubbo/extension/registries.py | 1 +
dubbo/logger/__init__.py | 23 --
dubbo/logger/_interfaces.py | 204 -------------
dubbo/logger/constants.py | 127 --------
dubbo/logger/logger_factory.py | 127 --------
dubbo/logger/logging/formatter.py | 89 ------
dubbo/logger/logging/logger.py | 94 ------
dubbo/logger/logging/logger_adapter.py | 186 ------------
dubbo/protocol/triple/invoker.py | 8 +-
dubbo/protocol/triple/protocol.py | 10 +-
dubbo/registry/protocol.py | 16 +-
dubbo/remoting/aio/http2/controllers.py | 8 +
dubbo/remoting/aio/http2/frames.py | 15 +-
dubbo/remoting/aio/http2/protocol.py | 35 +--
dubbo/remoting/aio/http2/stream.py | 13 +-
dubbo/remoting/aio/http2/stream_handler.py | 21 +-
dubbo/remoting/aio/http2/utils.py | 16 +-
dubbo/utils.py | 74 ++++-
requirements.txt | 3 +-
samples/registry/zookeeper/client.py | 10 +-
samples/stream/server_stream/client.py | 1 -
setup.py | 6 +-
40 files changed, 394 insertions(+), 2240 deletions(-)
diff --git a/dubbo/__version__.py b/dubbo/__version__.py
index aeae1de..7302839 100644
--- a/dubbo/__version__.py
+++ b/dubbo/__version__.py
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-__version__ = "1.0.0b1"
+__version__ = "3.0.0b1"
diff --git a/dubbo/cluster/failfast_cluster.py
b/dubbo/cluster/failfast_cluster.py
index 8bfe47a..ee5497e 100644
--- a/dubbo/cluster/failfast_cluster.py
+++ b/dubbo/cluster/failfast_cluster.py
@@ -15,6 +15,7 @@
# limitations under the License.
from dubbo.cluster import Cluster, Directory, LoadBalance
+from dubbo.cluster.loadbalances import CpuLoadBalance
from dubbo.constants import common_constants
from dubbo.extension import extensionLoader
from dubbo.protocol import Invoker, Result
@@ -27,13 +28,16 @@ class FailfastInvoker(Invoker):
FailfastInvoker
"""
- def __init__(self, directory: Directory, url: URL):
+ def __init__(self, directory, url: URL):
self._directory = directory
self._load_balance = extensionLoader.get_extension(
LoadBalance, url.parameters.get(common_constants.LOADBALANCE_KEY,
"random")
)()
+ if isinstance(self._load_balance, CpuLoadBalance):
+ self._load_balance.set_monitor(directory)
+
def invoke(self, invocation) -> Result:
# get the invokers
diff --git a/dubbo/cluster/loadbalances.py b/dubbo/cluster/loadbalances.py
index 4b6f0b3..94e2e54 100644
--- a/dubbo/cluster/loadbalances.py
+++ b/dubbo/cluster/loadbalances.py
@@ -18,6 +18,7 @@ import random
from typing import List, Optional
from dubbo.cluster import LoadBalance
+from dubbo.cluster.monitor.cpu import CpuMonitor
from dubbo.protocol import Invocation, Invoker
@@ -63,3 +64,46 @@ class RandomLoadBalance(AbstractLoadBalance):
) -> Optional[Invoker]:
randint = random.randint(0, len(invokers) - 1)
return invokers[randint]
+
+
+class CpuLoadBalance(AbstractLoadBalance):
+ """
+ CPU load balance.
+ """
+
+ def __init__(self):
+ self._monitor: Optional[CpuMonitor] = None
+
+ def set_monitor(self, monitor: CpuMonitor) -> None:
+ """
+ Set the CPU monitor.
+ :param monitor: The CPU monitor.
+ :type monitor: CpuMonitor
+ """
+ self._monitor = monitor
+
+ def do_select(
+ self, invokers: List[Invoker], invocation: Invocation
+ ) -> Optional[Invoker]:
+ # get the CPU usage
+ cpu_usages = self._monitor.get_cpu_usage(invokers)
+ # Select the caller with the lowest CPU usage, 0 means CPU usage is
unknown.
+ available_invokers = []
+ unknown_invokers = []
+
+ for invoker, cpu_usage in cpu_usages.items():
+ if cpu_usage == 0:
+ unknown_invokers.append((cpu_usage, invoker))
+ else:
+ available_invokers.append((cpu_usage, invoker))
+
+ if available_invokers:
+ # sort and select the invoker with the lowest CPU usage
+ available_invokers.sort(key=lambda x: x[0])
+ return available_invokers[0][1]
+ elif unknown_invokers:
+ # get the invoker with unknown CPU usage randomly
+ randint = random.randint(0, len(unknown_invokers) - 1)
+ return unknown_invokers[randint][1]
+ else:
+ return None
diff --git a/dubbo/logger/logging/__init__.py
b/dubbo/cluster/monitor/__init__.py
similarity index 91%
rename from dubbo/logger/logging/__init__.py
rename to dubbo/cluster/monitor/__init__.py
index 10e45eb..bcba37a 100644
--- a/dubbo/logger/logging/__init__.py
+++ b/dubbo/cluster/monitor/__init__.py
@@ -13,7 +13,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-from .logger_adapter import LoggerAdapter
-
-__all__ = ["LoggerAdapter"]
diff --git a/dubbo/cluster/monitor/cpu.py b/dubbo/cluster/monitor/cpu.py
new file mode 100644
index 0000000..3b3831f
--- /dev/null
+++ b/dubbo/cluster/monitor/cpu.py
@@ -0,0 +1,176 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import threading
+from typing import Dict, List
+from dubbo.cluster.directories import RegistryDirectory
+from dubbo.constants import common_constants
+from dubbo.loggers import loggerFactory
+from dubbo.protocol import Protocol, Invoker
+from dubbo.protocol.invocation import RpcInvocation
+from dubbo.proxy.handlers import RpcServiceHandler, RpcMethodHandler
+from dubbo.registry import Registry
+from dubbo.types import UnaryCallType
+from dubbo.url import URL
+from dubbo.utils import CpuUtils
+
+_LOGGER = loggerFactory.get_logger()
+
+_cpu_invocation = RpcInvocation(
+ "org.apache.dubbo.MetricsService",
+ "cpu",
+ str(1).encode("utf-8"),
+ attributes={
+ common_constants.CALL_KEY: UnaryCallType,
+ },
+)
+
+
+class CpuMonitor(RegistryDirectory):
+ """
+ The CPU monitor.
+ """
+
+ def __init__(self, registry: Registry, protocol: Protocol, url: URL):
+ super().__init__(registry, protocol, url)
+
+ # interval
+ self._interval = 5
+
+ # about CPU usage
+ self._usages_lock = threading.Lock()
+ self._cpu_usages: Dict[Invoker, float] = {}
+
+ # running invokers
+ self._running_invokers: Dict[str, Invoker] = {}
+
+ # thread
+ self._started = False
+ self._thread: threading.Thread = threading.Thread(
+ target=self._monitor_cpu, daemon=True
+ )
+ self._stop_event: threading.Event = threading.Event()
+
+ # start the monitor
+ self.start()
+
+ def start(self) -> None:
+ """
+ Start the monitor.
+ """
+ if self._stop_event.is_set():
+ raise RuntimeError("The monitor has been stopped.")
+ elif self._started:
+ return
+
+ self._started = True
+ self._thread.start()
+ _LOGGER.info("The CPU monitor has been started.")
+
+ def stop(self) -> None:
+ """
+ Stop the monitor.
+ """
+ if self._stop_event.is_set():
+ return
+ # notify the thread to stop
+ self._stop_event.set()
+
+ def _monitor_cpu(self) -> None:
+ """
+ Monitor the CPU usage.
+ """
+ while True:
+ # get available invokers
+ available_invokers = {
+ url: invoker
+ for url, invoker in self._invokers.items()
+ if invoker.is_available()
+ }
+
+ # update the running invokers
+ self._running_invokers = available_invokers
+
+ # update the CPU usage
+ with self._usages_lock:
+ self._cpu_usages = {
+ invoker: usage
+ for invoker, usage in self._cpu_usages.items()
+ if invoker in available_invokers.values()
+ }
+
+ # get the CPU usage for each invoker
+ for url, invoker in self._running_invokers.items():
+ if invoker.is_available():
+ try:
+ result = invoker.invoke(_cpu_invocation)
+ cpu_usage = float(result.value().decode("utf-8"))
+ self._cpu_usages[invoker] = cpu_usage
+ except Exception as e:
+ _LOGGER.error(
+ f"Failed to get the CPU usage for invoker {url}:
{str(e)}"
+ )
+ # remove the cpu usage
+ self._remove_cpu_usage(invoker)
+
+ # wait for the interval or stop
+ if self._stop_event.wait(self._interval):
+ _LOGGER.info("The CPU monitor has been stopped.")
+ break
+
+ def get_cpu_usage(self, invokers: List[Invoker]) -> Dict[Invoker, float]:
+ """
+ Get the CPU usage for the invoker.
+ :param invokers: The invokers.
+ :type invokers: List[Invoker]
+ :return: The CPU usage.
+ :rtype: Dict[Invoker, float]
+ """
+ with self._usages_lock:
+ return {invoker: self._cpu_usages.get(invoker, 0) for invoker in
invokers}
+
+ def _remove_cpu_usage(self, invoker: Invoker) -> None:
+ with self._usages_lock:
+ self._cpu_usages.pop(invoker)
+
+
+class CpuInnerRpcHandler:
+ """
+ The CPU inner RPC handler.
+ """
+
+ @staticmethod
+ def get_service_handler() -> RpcServiceHandler:
+ """
+ Get the service handler.
+ :return: The service handler.
+ :rtype: RpcServiceHandler
+ """
+ return RpcServiceHandler(
+ "org.apache.dubbo.MetricsService",
+ {"cpu": RpcMethodHandler.unary(CpuInnerRpcHandler.get_cpu_usage)},
+ )
+
+ @staticmethod
+ def get_cpu_usage(interval) -> bytes:
+ """
+ Get the CPU usage.
+ :param interval: The interval.
+ :type interval: bytes
+ :return: The CPU usage.
+ :rtype: bytes
+ """
+ float_value =
CpuUtils.get_total_cpu_usage(interval=int(interval.decode("utf-8")))
+ return str(float_value).encode("utf-8")
diff --git a/dubbo/common/__init__.py b/dubbo/common/__init__.py
deleted file mode 100644
index a860593..0000000
--- a/dubbo/common/__init__.py
+++ /dev/null
@@ -1,32 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from .classes import SingletonBase
-from .deliverers import MultiMessageDeliverer, SingleMessageDeliverer
-from .node import Node
-from .types import DeserializingFunction, SerializingFunction
-from .url import URL, create_url
-
-__all__ = [
- "SingleMessageDeliverer",
- "MultiMessageDeliverer",
- "URL",
- "create_url",
- "Node",
- "SingletonBase",
- "DeserializingFunction",
- "SerializingFunction",
-]
diff --git a/dubbo/common/classes.py b/dubbo/common/classes.py
deleted file mode 100644
index b27c7b9..0000000
--- a/dubbo/common/classes.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import threading
-
-__all__ = ["SingletonBase"]
-
-
-class SingletonBase:
- """
- Singleton base class. This class ensures that only one instance of a
derived class exists.
-
- This implementation is thread-safe.
- """
-
- _instance = None
- _instance_lock = threading.Lock()
-
- def __new__(cls, *args, **kwargs):
- """
- Create a new instance of the class if it does not exist.
- """
- if cls._instance is None:
- with cls._instance_lock:
- # double check
- if cls._instance is None:
- cls._instance = super(SingletonBase, cls).__new__(cls)
- return cls._instance
diff --git a/dubbo/common/constants.py b/dubbo/common/constants.py
deleted file mode 100644
index 33e4f9f..0000000
--- a/dubbo/common/constants.py
+++ /dev/null
@@ -1,62 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-PROTOCOL_KEY = "protocol"
-TRIPLE = "triple"
-TRIPLE_SHORT = "tri"
-
-SIDE_KEY = "side"
-SERVER_VALUE = "server"
-CLIENT_VALUE = "client"
-
-METHOD_KEY = "method"
-SERVICE_KEY = "service"
-
-SERVICE_HANDLER_KEY = "service-handler"
-
-GROUP_KEY = "group"
-
-LOCAL_HOST_KEY = "localhost"
-LOCAL_HOST_VALUE = "127.0.0.1"
-DEFAULT_PORT = 50051
-
-SSL_ENABLED_KEY = "ssl-enabled"
-
-SERIALIZATION_KEY = "serialization"
-SERIALIZER_KEY = "serializer"
-DESERIALIZER_KEY = "deserializer"
-
-
-COMPRESSION_KEY = "compression"
-COMPRESSOR_KEY = "compressor"
-DECOMPRESSOR_KEY = "decompressor"
-
-
-TRANSPORTER_KEY = "transporter"
-TRANSPORTER_DEFAULT_VALUE = "aio"
-
-TRUE_VALUE = "true"
-FALSE_VALUE = "false"
-
-CALL_KEY = "call"
-UNARY_CALL_VALUE = "unary"
-CLIENT_STREAM_CALL_VALUE = "client-stream"
-SERVER_STREAM_CALL_VALUE = "server-stream"
-BI_STREAM_CALL_VALUE = "bi-stream"
-
-PATH_SEPARATOR = "/"
-PROTOCOL_SEPARATOR = "://"
-DYNAMIC_KEY = "dynamic"
diff --git a/dubbo/common/deliverers.py b/dubbo/common/deliverers.py
deleted file mode 100644
index 67790ec..0000000
--- a/dubbo/common/deliverers.py
+++ /dev/null
@@ -1,314 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import abc
-import enum
-import queue
-import threading
-from typing import Any, Optional
-
-__all__ = ["MessageDeliverer", "SingleMessageDeliverer",
"MultiMessageDeliverer"]
-
-
-class DelivererStatus(enum.Enum):
- """
- Enumeration for deliverer status.
-
- Possible statuses:
- - PENDING: The deliverer is pending action.
- - COMPLETED: The deliverer has completed the action.
- - CANCELLED: The action for the deliverer has been cancelled.
- - FINISHED: The deliverer has finished all actions and is in a final
state.
- """
-
- PENDING = 0
- COMPLETED = 1
- CANCELLED = 2
- FINISHED = 3
-
- @classmethod
- def change_allowed(
- cls, current_status: "DelivererStatus", target_status:
"DelivererStatus"
- ) -> bool:
- """
- Check if a transition from `current_status` to `target_status` is
allowed.
-
- :param current_status: The current status of the deliverer.
- :type current_status: DelivererStatus
- :param target_status: The target status to transition to.
- :type target_status: DelivererStatus
- :return: A boolean indicating if the transition is allowed.
- :rtype: bool
- """
- # PENDING -> COMPLETED or CANCELLED
- if current_status == cls.PENDING:
- return target_status in {cls.COMPLETED, cls.CANCELLED}
-
- # COMPLETED -> FINISHED or CANCELLED
- elif current_status == cls.COMPLETED:
- return target_status in {cls.FINISHED, cls.CANCELLED}
-
- # CANCELLED -> FINISHED
- elif current_status == cls.CANCELLED:
- return target_status == cls.FINISHED
-
- # FINISHED is the final state, no further transitions allowed
- else:
- return False
-
-
-class NoMoreMessageError(RuntimeError):
- """
- Exception raised when no more messages are available.
- """
-
- def __init__(self, message: str = "No more message"):
- super().__init__(message)
-
-
-class EmptyMessageError(RuntimeError):
- """
- Exception raised when the message is empty.
- """
-
- def __init__(self, message: str = "Message is empty"):
- super().__init__(message)
-
-
-class MessageDeliverer(abc.ABC):
- """
- Abstract base class for message deliverers.
- """
-
- __slots__ = ["_status"]
-
- def __init__(self):
- self._status = DelivererStatus.PENDING
-
- @abc.abstractmethod
- def add(self, message: Any) -> None:
- """
- Add a message to the deliverer.
-
- :param message: The message to be added.
- :type message: Any
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def complete(self, message: Any = None) -> None:
- """
- Mark the message delivery as complete.
-
- :param message: The last message (optional).
- :type message: Any, optional
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def cancel(self, exc: Optional[Exception]) -> None:
- """
- Cancel the message delivery.
-
- :param exc: The exception that caused the cancellation.
- :type exc: Exception, optional
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def get(self) -> Any:
- """
- Get the next message.
-
- :return: The next message.
- :rtype: Any
- :raises NoMoreMessageError: If no more messages are available.
- :raises Exception: If the message delivery is cancelled.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def get_nowait(self) -> Any:
- """
- Get the next message without waiting.
-
- :return: The next message.
- :rtype: Any
- :raises EmptyMessageError: If the message is empty.
- :raises NoMoreMessageError: If no more messages are available.
- :raises Exception: If the message delivery is cancelled.
- """
- raise NotImplementedError()
-
-
-class SingleMessageDeliverer(MessageDeliverer):
- """
- Message deliverer for a single message using a signal-based approach.
- """
-
- __slots__ = ["_condition", "_message"]
-
- def __init__(self):
- super().__init__()
- self._condition = threading.Condition()
- self._message: Any = None
-
- def add(self, message: Any) -> None:
- with self._condition:
- if self._status is DelivererStatus.PENDING:
- # Add the message
- self._message = message
-
- def complete(self, message: Any = None) -> None:
- with self._condition:
- if DelivererStatus.change_allowed(self._status,
DelivererStatus.COMPLETED):
- if message is not None:
- self._message = message
- # update the status
- self._status = DelivererStatus.COMPLETED
- self._condition.notify_all()
-
- def cancel(self, exc: Optional[Exception]) -> None:
- with self._condition:
- if DelivererStatus.change_allowed(self._status,
DelivererStatus.CANCELLED):
- # Cancel the delivery
- self._message = exc or RuntimeError("delivery cancelled.")
- self._status = DelivererStatus.CANCELLED
- self._condition.notify_all()
-
- def get(self) -> Any:
- with self._condition:
- if self._status is DelivererStatus.FINISHED:
- raise NoMoreMessageError("Message already consumed.")
-
- if self._status is DelivererStatus.PENDING:
- # If the message is not available, wait
- self._condition.wait()
-
- # check the status
- if self._status is DelivererStatus.CANCELLED:
- raise self._message
-
- self._status = DelivererStatus.FINISHED
- return self._message
-
- def get_nowait(self) -> Any:
- with self._condition:
- if self._status is DelivererStatus.FINISHED:
- self._status = DelivererStatus.PENDING
- return self._message
-
- # raise error
- if self._status is DelivererStatus.FINISHED:
- raise NoMoreMessageError("Message already consumed.")
- elif self._status is DelivererStatus.CANCELLED:
- raise self._message
- elif self._status is DelivererStatus.PENDING:
- raise EmptyMessageError("Message is empty")
-
-
-class MultiMessageDeliverer(MessageDeliverer):
- """
- Message deliverer supporting multiple messages.
- """
-
- __slots__ = ["_lock", "_counter", "_messages", "_END_SENTINEL"]
-
- def __init__(self):
- super().__init__()
- self._lock = threading.Lock()
- self._counter = 0
- self._messages: queue.PriorityQueue[Any] = queue.PriorityQueue()
- self._END_SENTINEL = object()
-
- def add(self, message: Any) -> None:
- with self._lock:
- if self._status is DelivererStatus.PENDING:
- # Add the message
- self._counter += 1
- self._messages.put_nowait((self._counter, message))
-
- def complete(self, message: Any = None) -> None:
- with self._lock:
- if DelivererStatus.change_allowed(self._status,
DelivererStatus.COMPLETED):
- if message is not None:
- self._counter += 1
- self._messages.put_nowait((self._counter, message))
-
- # Add the end sentinel
- self._counter += 1
- self._messages.put_nowait((self._counter, self._END_SENTINEL))
- self._status = DelivererStatus.COMPLETED
-
- def cancel(self, exc: Optional[Exception]) -> None:
- with self._lock:
- if DelivererStatus.change_allowed(self._status,
DelivererStatus.CANCELLED):
- # Set the priority to -1 -> make sure it is the first message
- self._messages.put_nowait(
- (-1, exc or RuntimeError("delivery cancelled."))
- )
- self._status = DelivererStatus.CANCELLED
-
- def get(self) -> Any:
- if self._status is DelivererStatus.FINISHED:
- raise NoMoreMessageError("No more message")
-
- # block until the message is available
- priority, message = self._messages.get()
-
- # check the status
- if self._status is DelivererStatus.CANCELLED:
- raise message
- elif message is self._END_SENTINEL:
- self._status = DelivererStatus.FINISHED
- raise NoMoreMessageError("No more message")
- else:
- return message
-
- def get_nowait(self) -> Any:
- try:
- if self._status is DelivererStatus.FINISHED:
- raise NoMoreMessageError("No more message")
-
- priority, message = self._messages.get_nowait()
-
- # check the status
- if self._status is DelivererStatus.CANCELLED:
- raise message
- elif message is self._END_SENTINEL:
- self._status = DelivererStatus.FINISHED
- raise NoMoreMessageError("No more message")
- else:
- return message
- except queue.Empty:
- raise EmptyMessageError("Message is empty")
-
- def __iter__(self):
- return self
-
- def __next__(self):
- """
- Returns the next request from the queue.
-
- :return: The next message.
- :rtype: Any
- :raises StopIteration: If no more messages are available.
- """
- while True:
- try:
- return self.get()
- except NoMoreMessageError:
- raise StopIteration
diff --git a/dubbo/common/node.py b/dubbo/common/node.py
deleted file mode 100644
index a5ec339..0000000
--- a/dubbo/common/node.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import abc
-
-from dubbo.common.url import URL
-
-__all__ = ["Node"]
-
-
-class Node(abc.ABC):
- """
- Abstract base class for a Node.
- """
-
- @abc.abstractmethod
- def get_url(self) -> URL:
- """
- Get the URL of the node.
-
- :return: The URL of the node.
- :rtype: URL
- :raises NotImplementedError: If the method is not implemented.
- """
- raise NotImplementedError("get_url() is not implemented.")
-
- @abc.abstractmethod
- def is_available(self) -> bool:
- """
- Check if the node is available.
-
- :return: True if the node is available, False otherwise.
- :rtype: bool
- :raises NotImplementedError: If the method is not implemented.
- """
- raise NotImplementedError("is_available() is not implemented.")
-
- @abc.abstractmethod
- def destroy(self) -> None:
- """
- Destroy the node.
-
- :raises NotImplementedError: If the method is not implemented.
- """
- raise NotImplementedError("destroy() is not implemented.")
diff --git a/dubbo/common/types.py b/dubbo/common/types.py
deleted file mode 100644
index 029b837..0000000
--- a/dubbo/common/types.py
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import Any, Callable
-
-__all__ = ["SerializingFunction", "DeserializingFunction"]
-
-SerializingFunction = Callable[[Any], bytes]
-DeserializingFunction = Callable[[bytes], Any]
diff --git a/dubbo/common/url.py b/dubbo/common/url.py
deleted file mode 100644
index 581fd84..0000000
--- a/dubbo/common/url.py
+++ /dev/null
@@ -1,325 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import copy
-from typing import Any, Dict, Optional
-from urllib import parse
-from urllib.parse import urlencode, urlunparse
-
-from dubbo.common.constants import PROTOCOL_SEPARATOR
-
-__all__ = ["URL", "create_url"]
-
-
-def create_url(url: str, encoded: bool = False) -> "URL":
- """
- Creates a URL object from a URL string.
-
- This function takes a URL string and converts it into a URL object.
- If the 'encoded' parameter is set to True, the URL string will be decoded
before being converted.
-
- :param url: The URL string to be converted into a URL object.
- :type url: str
- :param encoded: Determines if the URL string should be decoded before
being converted. Defaults to False.
- :type encoded: bool
- :return: A URL object.
- :rtype: URL
- :raises ValueError: If the URL format is invalid.
- """
- # If the URL is encoded, decode it
- if encoded:
- url = parse.unquote(url)
-
- if PROTOCOL_SEPARATOR not in url:
- raise ValueError("Invalid URL format: missing protocol")
-
- parsed_url = parse.urlparse(url)
-
- if not parsed_url.scheme:
- raise ValueError("Invalid URL format: missing scheme.")
-
- return URL(
- parsed_url.scheme,
- parsed_url.hostname or "",
- parsed_url.port,
- parsed_url.username or "",
- parsed_url.password or "",
- parsed_url.path.lstrip("/"),
- {k: v[0] for k, v in parse.parse_qs(parsed_url.query).items()},
- )
-
-
-class URL:
- """
- URL - Uniform Resource Locator.
- """
-
- __slots__ = [
- "_scheme",
- "_host",
- "_port",
- "_location",
- "_username",
- "_password",
- "_path",
- "_parameters",
- "_attributes",
- ]
-
- def __init__(
- self,
- scheme: str,
- host: str,
- port: Optional[int] = None,
- username: str = "",
- password: str = "",
- path: str = "",
- parameters: Optional[Dict[str, str]] = None,
- attributes: Optional[Dict[str, Any]] = None,
- ):
- """
- Initialize the URL object.
-
- :param scheme: The scheme of the URL (e.g., 'http', 'https').
- :type scheme: str
- :param host: The host of the URL.
- :type host: str
- :param port: The port number of the URL, defaults to None.
- :type port: int, optional
- :param username: The username for authentication, defaults to an empty
string.
- :type username: str, optional
- :param password: The password for authentication, defaults to an empty
string.
- :type password: str, optional
- :param path: The path of the URL, defaults to an empty string.
- :type path: str, optional
- :param parameters: The query parameters of the URL as a dictionary,
defaults to None.
- :type parameters: Dict[str, str], optional
- :param attributes: Additional attributes of the URL as a dictionary,
defaults to None.
- :type attributes: Dict[str, Any], optional
- """
- self._scheme = scheme
- self._host = host
- self._port = port
- self._location = f"{host}:{port}" if port else host
- self._username = username
- self._password = password
- self._path = path
- self._parameters = parameters or {}
- self._attributes = attributes or {}
-
- @property
- def scheme(self) -> str:
- """
- Get or set the scheme of the URL.
-
- :return: The scheme of the URL.
- :rtype: str
- """
- return self._scheme
-
- @scheme.setter
- def scheme(self, value: str):
- self._scheme = value
-
- @property
- def host(self) -> str:
- """
- Get or set the host of the URL.
-
- :return: The host of the URL.
- :rtype: str
- """
- return self._host
-
- @host.setter
- def host(self, value: str):
- self._host = value
- self._location = f"{self.host}:{self.port}" if self.port else self.host
-
- @property
- def port(self) -> Optional[int]:
- """
- Get or set the port of the URL.
-
- :return: The port of the URL.
- :rtype: int, optional
- """
- return self._port
-
- @port.setter
- def port(self, value: int):
- if value > 0:
- self._port = value
- self._location = f"{self.host}:{self.port}"
-
- @property
- def location(self) -> str:
- """
- Get or set the location (host:port) of the URL.
-
- :return: The location of the URL.
- :rtype: str
- """
- return self._location
-
- @location.setter
- def location(self, value: str):
- try:
- values = value.split(":")
- self.host = values[0]
- if len(values) == 2:
- self.port = int(values[1])
- except Exception as e:
- raise ValueError(f"Invalid location: {value}") from e
-
- @property
- def username(self) -> str:
- """
- Get or set the username for authentication.
-
- :return: The username.
- :rtype: str
- """
- return self._username
-
- @username.setter
- def username(self, value: str):
- self._username = value
-
- @property
- def password(self) -> str:
- """
- Get or set the password for authentication.
-
- :return: The password.
- :rtype: str
- """
- return self._password
-
- @password.setter
- def password(self, value: str):
- self._password = value
-
- @property
- def path(self) -> str:
- """
- Get or set the path of the URL.
-
- :return: The path of the URL.
- :rtype: str
- """
- return self._path
-
- @path.setter
- def path(self, value: str):
- self._path = value.lstrip("/")
-
- @property
- def parameters(self) -> Dict[str, str]:
- """
- Get the query parameters of the URL.
-
- :return: The query parameters as a dictionary.
- :rtype: Dict[str, str]
- """
- return self._parameters
-
- @property
- def attributes(self) -> Dict[str, Any]:
- """
- Get the additional attributes of the URL.
-
- :return: The attributes as a dictionary.
- :rtype: Dict[str, Any]
- """
- return self._attributes
-
- def to_str(self, encode: bool = False) -> str:
- """
- Converts the URL to a string.
-
- :param encode: Determines if the URL should be encoded. Defaults to
False.
- :type encode: bool
- :return: The URL string.
- :rtype: str
- """
- # Construct the netloc part
- if self.username and self.password:
- netloc = f"{self.username}:{self.password}@{self.host}"
- else:
- netloc = self.host
-
- if self.port:
- netloc = f"{netloc}:{self.port}"
-
- # Convert parameters dictionary to query string
- query = urlencode(self.parameters)
-
- # Construct the URL
- url = urlunparse((self.scheme or "", netloc, self.path or "/", "",
query, ""))
-
- if encode:
- url = parse.quote(url)
-
- return url
-
- def copy(self) -> "URL":
- """
- Copy the URL object.
-
- :return: A shallow copy of the URL object.
- :rtype: URL
- """
- return copy.copy(self)
-
- def deepcopy(self) -> "URL":
- """
- Deep copy the URL object.
-
- :return: A deep copy of the URL object.
- :rtype: URL
- """
- return copy.deepcopy(self)
-
- def __copy__(self) -> "URL":
- return URL(
- self.scheme,
- self.host,
- self.port,
- self.username,
- self.password,
- self.path,
- self.parameters.copy(),
- self.attributes.copy(),
- )
-
- def __deepcopy__(self, memo) -> "URL":
- return URL(
- self.scheme,
- self.host,
- self.port,
- self.username,
- self.password,
- self.path,
- copy.deepcopy(self.parameters, memo),
- copy.deepcopy(self.attributes, memo),
- )
-
- def __str__(self) -> str:
- return self.to_str()
-
- def __repr__(self) -> str:
- return self.to_str()
diff --git a/dubbo/common/utils.py b/dubbo/common/utils.py
deleted file mode 100644
index 4b20998..0000000
--- a/dubbo/common/utils.py
+++ /dev/null
@@ -1,129 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-__all__ = ["EventHelper", "FutureHelper"]
-
-
-class EventHelper:
- """
- Helper class for event operations.
- """
-
- @staticmethod
- def is_set(event) -> bool:
- """
- Check if the event is set.
-
- :param event: Event object, you can use threading.Event or any other
object that supports the is_set operation.
- :type event: Any
- :return: True if the event is set, or False if the is_set method is
not supported or the event is invalid.
- :rtype: bool
- """
- return event.is_set() if event and hasattr(event, "is_set") else False
-
- @staticmethod
- def set(event) -> bool:
- """
- Attempt to set the event object.
-
- :param event: Event object, you can use threading.Event or any other
object that supports the set operation.
- :type event: Any
- :return: True if the event was set, False otherwise
- (such as the event is invalid or does not support the set
operation).
- :rtype: bool
- """
- if event is None:
- return False
-
- # If the event supports the set operation, set the event and return
True
- if hasattr(event, "set"):
- event.set()
- return True
-
- # If the event is invalid or does not support the set operation,
return False
- return False
-
- @staticmethod
- def clear(event) -> bool:
- """
- Attempt to clear the event object.
-
- :param event: Event object, you can use threading.Event or any other
object that supports the clear operation.
- :type event: Any
- :return: True if the event was cleared, False otherwise
- (such as the event is invalid or does not support the clear
operation).
- :rtype: bool
- """
- if not event:
- return False
-
- # If the event supports the clear operation, clear the event and
return True
- if hasattr(event, "clear"):
- event.clear()
- return True
-
- # If the event is invalid or does not support the clear operation,
return False
- return False
-
-
-class FutureHelper:
- """
- Helper class for future operations.
- """
-
- @staticmethod
- def done(future) -> bool:
- """
- Check if the future is done.
-
- :param future: Future object
- :type future: Any
- :return: True if the future is done, False otherwise.
- :rtype: bool
- """
- return future.done() if future and hasattr(future, "done") else False
-
- @staticmethod
- def set_result(future, result):
- """
- Set the result of the future.
-
- :param future: Future object
- :type future: Any
- :param result: Result to set
- :type result: Any
- """
- if not future or FutureHelper.done(future):
- return
-
- if hasattr(future, "set_result"):
- future.set_result(result)
-
- @staticmethod
- def set_exception(future, exception):
- """
- Set the exception to the future.
-
- :param future: Future object
- :type future: Any
- :param exception: Exception to set
- :type exception: Exception
- """
- if not future or FutureHelper.done(future):
- return
-
- if hasattr(future, "set_exception"):
- future.set_exception(exception)
diff --git a/dubbo/config/__init__.py b/dubbo/config/__init__.py
deleted file mode 100644
index 7ffd615..0000000
--- a/dubbo/config/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from .logger_config import FileLoggerConfig, LoggerConfig
-from .protocol_config import ProtocolConfig
-from .reference_config import ReferenceConfig
diff --git a/dubbo/config/logger_config.py b/dubbo/config/logger_config.py
deleted file mode 100644
index ecae584..0000000
--- a/dubbo/config/logger_config.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from dataclasses import dataclass
-from typing import Dict, Optional
-
-from dubbo.common.url import URL
-from dubbo.extension import extensionLoader
-from dubbo.logger import LoggerAdapter
-from dubbo.logger import constants as logger_constants
-from dubbo.logger import loggerFactory
-from dubbo.logger.constants import Level
-
-
-@dataclass
-class FileLoggerConfig:
- """
- File logger configuration.
- :param rotate: File rotate type.
- :type rotate: logger_constants.FileRotateType
- :param file_formatter: File formatter.
- :type file_formatter: Optional[str]
- :param file_dir: File directory.
- :type file_dir: str
- :param file_name: File name.
- :type file_name: str
- :param backup_count: Backup count.
- :type backup_count: int
- :param max_bytes: Max bytes.
- :type max_bytes: int
- :param interval: Interval.
- :type interval: int
- """
-
- rotate: logger_constants.FileRotateType =
logger_constants.FileRotateType.NONE
- file_formatter: Optional[str] = None
- file_dir: str = logger_constants.DEFAULT_FILE_DIR_VALUE
- file_name: str = logger_constants.DEFAULT_FILE_NAME_VALUE
- backup_count: int = logger_constants.DEFAULT_FILE_BACKUP_COUNT_VALUE
- max_bytes: int = logger_constants.DEFAULT_FILE_MAX_BYTES_VALUE
- interval: int = logger_constants.DEFAULT_FILE_INTERVAL_VALUE
-
- def check(self) -> None:
- if self.rotate == logger_constants.FileRotateType.SIZE and
self.max_bytes < 0:
- raise ValueError("Max bytes can't be less than 0")
- elif self.rotate == logger_constants.FileRotateType.TIME and
self.interval < 1:
- raise ValueError("Interval can't be less than 1")
-
- def dict(self) -> Dict[str, str]:
- return {
- logger_constants.FILE_DIR_KEY: self.file_dir,
- logger_constants.FILE_NAME_KEY: self.file_name,
- logger_constants.FILE_ROTATE_KEY: self.rotate.value,
- logger_constants.FILE_MAX_BYTES_KEY: str(self.max_bytes),
- logger_constants.FILE_INTERVAL_KEY: str(self.interval),
- logger_constants.FILE_BACKUP_COUNT_KEY: str(self.backup_count),
- }
-
-
-class LoggerConfig:
- """
- Logger configuration.
- """
-
- __slots__ = [
- "_driver",
- "_level",
- "_console_enabled",
- "_console_config",
- "_file_enabled",
- "_file_config",
- ]
-
- def __init__(
- self,
- driver,
- level: Level,
- console_enabled: bool,
- file_enabled: bool,
- file_config: FileLoggerConfig,
- ):
- """
- Initialize the logger configuration.
- :param driver: The logger driver.
- :type driver: str
- :param level: The logger level.
- :type level: Level
- :param console_enabled: Whether to enable console logger.
- :type console_enabled: bool
- :param file_enabled: Whether to enable file logger.
- :type file_enabled: bool
- :param file_config: The file logger configuration.
- :type file_config: FileLogger
- """
- # set global config
- self._driver = driver
- self._level = level
- # set console config
- self._console_enabled = console_enabled
- # set file comfig
- self._file_enabled = file_enabled
- self._file_config = file_config
- if file_enabled:
- self._file_config.check()
-
- def get_url(self) -> URL:
- # get LoggerConfig parameters
- parameters = {
- logger_constants.DRIVER_KEY: self._driver,
- logger_constants.LEVEL_KEY: self._level.value,
- logger_constants.CONSOLE_ENABLED_KEY: str(self._console_enabled),
- logger_constants.FILE_ENABLED_KEY: str(self._file_enabled),
- **self._file_config.dict(),
- }
-
- return URL(scheme=self._driver, host=self._level.value,
parameters=parameters)
-
- def init(self):
- # get logger_adapter and initialize loggerFactory
- logger_adapter_class = extensionLoader.get_extension(
- LoggerAdapter, self._driver
- )
- logger_adapter = logger_adapter_class(self.get_url())
- loggerFactory.set_logger_adapter(logger_adapter)
-
- @classmethod
- def default_config(cls):
- """
- Get default logger configuration.
- """
- return LoggerConfig(
- driver=logger_constants.DEFAULT_DRIVER_VALUE,
- level=logger_constants.DEFAULT_LEVEL_VALUE,
- console_enabled=logger_constants.DEFAULT_CONSOLE_ENABLED_VALUE,
- file_enabled=logger_constants.DEFAULT_FILE_ENABLED_VALUE,
- file_config=FileLoggerConfig(),
- )
diff --git a/dubbo/config/protocol_config.py b/dubbo/config/protocol_config.py
deleted file mode 100644
index d629e1f..0000000
--- a/dubbo/config/protocol_config.py
+++ /dev/null
@@ -1,30 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class ProtocolConfig:
-
- _name: str
-
- __slots__ = ["_name"]
-
- @property
- def name(self) -> str:
- return self._name
-
- @name.setter
- def name(self, value: str):
- self._name = value
diff --git a/dubbo/config/reference_config.py b/dubbo/config/reference_config.py
deleted file mode 100644
index a7f258c..0000000
--- a/dubbo/config/reference_config.py
+++ /dev/null
@@ -1,62 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import threading
-from typing import Optional, Union
-
-from dubbo.common import URL, create_url
-from dubbo.extension import extensionLoader
-from dubbo.protocol import Invoker, Protocol
-
-
-class ReferenceConfig:
-
- __slots__ = [
- "_initialized",
- "_global_lock",
- "_service_name",
- "_url",
- "_protocol",
- "_invoker",
- ]
-
- def __init__(self, url: Union[str, URL], service_name: str):
- self._initialized = False
- self._global_lock = threading.Lock()
- self._url: URL = url if isinstance(url, URL) else create_url(url)
- self._service_name = service_name
- self._protocol: Optional[Protocol] = None
- self._invoker: Optional[Invoker] = None
-
- def get_invoker(self) -> Invoker:
- if not self._invoker:
- self._do_init()
- return self._invoker
-
- def _do_init(self):
- with self._global_lock:
- if self._initialized:
- return
- # Get the interface name from the URL path
- self._url.path = self._service_name
- self._protocol = extensionLoader.get_extension(Protocol,
self._url.scheme)(
- self._url
- )
- self._create_invoker()
- self._initialized = True
-
- def _create_invoker(self):
- self._invoker = self._protocol.refer(self._url)
diff --git a/dubbo/config/service_config.py b/dubbo/config/service_config.py
deleted file mode 100644
index a4f3644..0000000
--- a/dubbo/config/service_config.py
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import Optional
-
-from dubbo.common import URL
-from dubbo.common import constants as common_constants
-from dubbo.extension import extensionLoader
-from dubbo.protocol import Protocol
-from dubbo.proxy.handlers import RpcServiceHandler
-
-__all__ = ["ServiceConfig"]
-
-
-class ServiceConfig:
- """
- Service configuration
- """
-
- def __init__(
- self,
- service_handler: RpcServiceHandler,
- port: Optional[int] = None,
- protocol: Optional[str] = None,
- ):
-
- self._service_handler = service_handler
- self._port = port or common_constants.DEFAULT_PORT
-
- protocol_str = protocol or common_constants.TRIPLE_SHORT
-
- self._export_url = URL(
- protocol_str, common_constants.LOCAL_HOST_KEY, self._port
- )
- self._export_url.attributes[common_constants.SERVICE_HANDLER_KEY] = (
- service_handler
- )
-
- self._protocol: Protocol = extensionLoader.get_extension(
- Protocol, protocol_str
- )(self._export_url)
-
- self._exported = False
- self._exporting = False
-
- def export(self):
- """
- Export service
- """
- if self._exporting or self._exported:
- return
-
- self._exporting = True
- try:
- self._protocol.export(self._export_url)
- self._exported = True
- finally:
- self._exporting = False
diff --git a/dubbo/extension/registries.py b/dubbo/extension/registries.py
index 37c7bc7..f98974f 100644
--- a/dubbo/extension/registries.py
+++ b/dubbo/extension/registries.py
@@ -62,6 +62,7 @@ loadBalanceRegistry = ExtendedRegistry(
interface=LoadBalance,
impls={
"random": "dubbo.cluster.loadbalances.RandomLoadBalance",
+ "cpu": "dubbo.cluster.loadbalances.CpuLoadBalance",
},
)
diff --git a/dubbo/logger/__init__.py b/dubbo/logger/__init__.py
deleted file mode 100644
index 4f42594..0000000
--- a/dubbo/logger/__init__.py
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._interfaces import Logger, LoggerAdapter
-from .logger_factory import LoggerFactory as _LoggerFactory
-
-# The logger factory instance.
-loggerFactory = _LoggerFactory()
-
-__all__ = ["Logger", "LoggerAdapter", "loggerFactory"]
diff --git a/dubbo/logger/_interfaces.py b/dubbo/logger/_interfaces.py
deleted file mode 100644
index 88fa999..0000000
--- a/dubbo/logger/_interfaces.py
+++ /dev/null
@@ -1,204 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import abc
-from typing import Any
-
-from dubbo.common.url import URL
-
-from .constants import Level
-
-_all__ = ["Logger", "LoggerAdapter"]
-
-
-class Logger(abc.ABC):
- """
- Logger Interface, which is used to log messages.
- """
-
- @abc.abstractmethod
- def log(self, level: Level, msg: str, *args: Any, **kwargs: Any) -> None:
- """
- Log a message at the specified logging level.
-
- :param level: The logging level.
- :type level: Level
- :param msg: The log message.
- :type msg: str
- :param args: Additional positional arguments.
- :type args: Any
- :param kwargs: Additional keyword arguments.
- :type kwargs: Any
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def debug(self, msg: str, *args, **kwargs) -> None:
- """
- Log a debug message.
-
- :param msg: The debug message.
- :type msg: str
- :param args: Additional positional arguments.
- :type args: Any
- :param kwargs: Additional keyword arguments.
- :type kwargs: Any
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def info(self, msg: str, *args, **kwargs) -> None:
- """
- Log an info message.
-
- :param msg: The info message.
- :type msg: str
- :param args: Additional positional arguments.
- :type args: Any
- :param kwargs: Additional keyword arguments.
- :type kwargs: Any
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def warning(self, msg: str, *args, **kwargs) -> None:
- """
- Log a warning message.
-
- :param msg: The warning message.
- :type msg: str
- :param args: Additional positional arguments.
- :type args: Any
- :param kwargs: Additional keyword arguments.
- :type kwargs: Any
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def error(self, msg: str, *args, **kwargs) -> None:
- """
- Log an error message.
-
- :param msg: The error message.
- :type msg: str
- :param args: Additional positional arguments.
- :type args: Any
- :param kwargs: Additional keyword arguments.
- :type kwargs: Any
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def critical(self, msg: str, *args, **kwargs) -> None:
- """
- Log a critical message.
-
- :param msg: The critical message.
- :type msg: str
- :param args: Additional positional arguments.
- :type args: Any
- :param kwargs: Additional keyword arguments.
- :type kwargs: Any
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def fatal(self, msg: str, *args, **kwargs) -> None:
- """
- Log a fatal message.
-
- :param msg: The fatal message.
- :type msg: str
- :param args: Additional positional arguments.
- :type args: Any
- :param kwargs: Additional keyword arguments.
- :type kwargs: Any
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def exception(self, msg: str, *args, **kwargs) -> None:
- """
- Log an exception message.
-
- :param msg: The exception message.
- :type msg: str
- :param args: Additional positional arguments.
- :type args: Any
- :param kwargs: Additional keyword arguments.
- :type kwargs: Any
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def is_enabled_for(self, level: Level) -> bool:
- """
- Check if this logger is enabled for the specified level.
-
- :param level: The logging level.
- :type level: Level
- :return: Whether the logging level is enabled.
- :rtype: bool
- """
- raise NotImplementedError()
-
-
-class LoggerAdapter(abc.ABC):
- """
- Logger Adapter Interface, which is used to support different logging
libraries.
- """
-
- __slots__ = ["_config"]
-
- def __init__(self, config: URL):
- """
- Initialize the logger adapter.
-
- :param config: The configuration of the logger adapter.
- :type config: URL
- """
- self._config = config
-
- def get_logger(self, name: str) -> Logger:
- """
- Get a logger by name.
-
- :param name: The name of the logger.
- :type name: str
- :return: An instance of the logger.
- :rtype: Logger
- """
- raise NotImplementedError()
-
- @property
- def level(self) -> Level:
- """
- Get the current logging level.
-
- :return: The current logging level.
- :rtype: Level
- """
- raise NotImplementedError()
-
- @level.setter
- def level(self, level: Level) -> None:
- """
- Set the logging level.
-
- :param level: The logging level to set.
- :type level: Level
- """
- raise NotImplementedError()
diff --git a/dubbo/logger/constants.py b/dubbo/logger/constants.py
deleted file mode 100644
index a6cae5d..0000000
--- a/dubbo/logger/constants.py
+++ /dev/null
@@ -1,127 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import enum
-import os
-
-__all__ = [
- "Level",
- "FileRotateType",
- "LEVEL_KEY",
- "DRIVER_KEY",
- "CONSOLE_ENABLED_KEY",
- "FILE_ENABLED_KEY",
- "FILE_DIR_KEY",
- "FILE_NAME_KEY",
- "FILE_ROTATE_KEY",
- "FILE_MAX_BYTES_KEY",
- "FILE_INTERVAL_KEY",
- "FILE_BACKUP_COUNT_KEY",
- "DEFAULT_DRIVER_VALUE",
- "DEFAULT_LEVEL_VALUE",
- "DEFAULT_CONSOLE_ENABLED_VALUE",
- "DEFAULT_FILE_ENABLED_VALUE",
- "DEFAULT_FILE_DIR_VALUE",
- "DEFAULT_FILE_NAME_VALUE",
- "DEFAULT_FILE_MAX_BYTES_VALUE",
- "DEFAULT_FILE_INTERVAL_VALUE",
- "DEFAULT_FILE_BACKUP_COUNT_VALUE",
-]
-
-
[email protected]
-class Level(enum.Enum):
- """
- The logging level enum.
-
- :cvar DEBUG: Debug level.
- :cvar INFO: Info level.
- :cvar WARNING: Warning level.
- :cvar ERROR: Error level.
- :cvar CRITICAL: Critical level.
- :cvar FATAL: Fatal level.
- :cvar UNKNOWN: Unknown level.
- """
-
- DEBUG = "DEBUG"
- INFO = "INFO"
- WARNING = "WARNING"
- ERROR = "ERROR"
- CRITICAL = "CRITICAL"
- FATAL = "FATAL"
- UNKNOWN = "UNKNOWN"
-
- @classmethod
- def get_level(cls, level_value: str) -> "Level":
- """
- Get the level from the level value.
-
- :param level_value: The level value.
- :type level_value: str
- :return: The level. If the level value is invalid, return UNKNOWN.
- :rtype: Level
- """
- level_value = level_value.upper()
- for level in cls:
- if level_value == level.value:
- return level
- return cls.UNKNOWN
-
-
[email protected]
-class FileRotateType(enum.Enum):
- """
- The file rotating type enum.
-
- :cvar NONE: No rotating.
- :cvar SIZE: Rotate the file by size.
- :cvar TIME: Rotate the file by time.
- """
-
- NONE = "NONE"
- SIZE = "SIZE"
- TIME = "TIME"
-
-
-"""logger config keys"""
-# global config
-LEVEL_KEY = "logger.level"
-DRIVER_KEY = "logger.driver"
-
-# console config
-CONSOLE_ENABLED_KEY = "logger.console.enable"
-
-# file logger
-FILE_ENABLED_KEY = "logger.file.enable"
-FILE_DIR_KEY = "logger.file.dir"
-FILE_NAME_KEY = "logger.file.name"
-FILE_ROTATE_KEY = "logger.file.rotate"
-FILE_MAX_BYTES_KEY = "logger.file.maxbytes"
-FILE_INTERVAL_KEY = "logger.file.interval"
-FILE_BACKUP_COUNT_KEY = "logger.file.backupcount"
-
-"""some logger default value"""
-DEFAULT_DRIVER_VALUE = "logging"
-DEFAULT_LEVEL_VALUE = Level.DEBUG
-# console
-DEFAULT_CONSOLE_ENABLED_VALUE = True
-# file
-DEFAULT_FILE_ENABLED_VALUE = False
-DEFAULT_FILE_DIR_VALUE = os.path.expanduser("~")
-DEFAULT_FILE_NAME_VALUE = "dubbo.log"
-DEFAULT_FILE_MAX_BYTES_VALUE = 10 * 1024 * 1024
-DEFAULT_FILE_INTERVAL_VALUE = 1
-DEFAULT_FILE_BACKUP_COUNT_VALUE = 10
diff --git a/dubbo/logger/logger_factory.py b/dubbo/logger/logger_factory.py
deleted file mode 100644
index 0a7d0b2..0000000
--- a/dubbo/logger/logger_factory.py
+++ /dev/null
@@ -1,127 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import threading
-from typing import Dict, Optional
-
-from dubbo.common import SingletonBase
-from dubbo.common.url import URL
-from dubbo.logger import Logger, LoggerAdapter
-from dubbo.logger import constants as logger_constants
-from dubbo.logger.constants import Level
-
-__all__ = ["LoggerFactory"]
-
-# Default logger config with default values.
-_DEFAULT_CONFIG = URL(
- scheme=logger_constants.DEFAULT_DRIVER_VALUE,
- host=logger_constants.DEFAULT_LEVEL_VALUE.value,
- parameters={
- logger_constants.DRIVER_KEY: logger_constants.DEFAULT_DRIVER_VALUE,
- logger_constants.LEVEL_KEY: logger_constants.DEFAULT_LEVEL_VALUE.value,
- logger_constants.CONSOLE_ENABLED_KEY: str(
- logger_constants.DEFAULT_CONSOLE_ENABLED_VALUE
- ),
- logger_constants.FILE_ENABLED_KEY: str(
- logger_constants.DEFAULT_FILE_ENABLED_VALUE
- ),
- },
-)
-
-
-class LoggerFactory(SingletonBase):
- """
- Singleton factory class for creating and managing loggers.
-
- This class ensures a single instance of the logger factory, provides
methods to set and get
- logger adapters, and manages logger instances.
- """
-
- def __init__(self):
- """
- Initialize the logger factory.
-
- This method sets up the internal lock, logger adapter, and logger
cache.
- """
- self._lock = threading.RLock()
- self._logger_adapter: Optional[LoggerAdapter] = None
- self._loggers: Dict[str, Logger] = {}
-
- def _ensure_logger_adapter(self) -> None:
- """
- Ensure the logger adapter is set.
-
- If the logger adapter is not set, this method sets it to the default
adapter.
- """
- if not self._logger_adapter:
- with self._lock:
- if not self._logger_adapter:
- # Import here to avoid circular imports
- from dubbo.logger.logging.logger_adapter import
LoggingLoggerAdapter
-
-
self.set_logger_adapter(LoggingLoggerAdapter(_DEFAULT_CONFIG))
-
- def set_logger_adapter(self, logger_adapter: LoggerAdapter) -> None:
- """
- Set the logger adapter.
-
- :param logger_adapter: The new logger adapter to use.
- :type logger_adapter: LoggerAdapter
- """
- with self._lock:
- self._logger_adapter = logger_adapter
- # Update all loggers
- self._loggers = {
- name: self._logger_adapter.get_logger(name) for name in
self._loggers
- }
-
- def get_logger_adapter(self) -> LoggerAdapter:
- """
- Get the current logger adapter.
-
- :return: The current logger adapter.
- :rtype: LoggerAdapter
- """
- self._ensure_logger_adapter()
- return self._logger_adapter
-
- def get_logger(self, name: str) -> Logger:
- """
- Get the logger by name.
-
- :param name: The name of the logger to retrieve.
- :type name: str
- :return: An instance of the requested logger.
- :rtype: Logger
- """
- self._ensure_logger_adapter()
- logger = self._loggers.get(name)
- if not logger:
- with self._lock:
- if name not in self._loggers:
- self._loggers[name] = self._logger_adapter.get_logger(name)
- logger = self._loggers[name]
- return logger
-
- def get_level(self) -> Level:
- """
- Get the current logging level.
-
- :return: The current logging level.
- :rtype: Level
- """
- self._ensure_logger_adapter()
- return self._logger_adapter.level
diff --git a/dubbo/logger/logging/formatter.py
b/dubbo/logger/logging/formatter.py
deleted file mode 100644
index 1dc409e..0000000
--- a/dubbo/logger/logging/formatter.py
+++ /dev/null
@@ -1,89 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import re
-from enum import Enum
-
-__all__ = ["ColorFormatter", "NoColorFormatter", "Colors"]
-
-
-class Colors(Enum):
- """
- Colors for log messages.
- """
-
- END = "\033[0m"
- BOLD = "\033[1m"
- BLUE = "\033[34m"
- GREEN = "\033[32m"
- PURPLE = "\033[35m"
- CYAN = "\033[36m"
- RED = "\033[31m"
- YELLOW = "\033[33m"
- GREY = "\033[38;5;240m"
-
-
-LEVEL_MAP = {
- "DEBUG": Colors.BLUE.value,
- "INFO": Colors.GREEN.value,
- "WARNING": Colors.YELLOW.value,
- "ERROR": Colors.RED.value,
- "CRITICAL": Colors.RED.value + Colors.BOLD.value,
-}
-
-DATE_FORMAT: str = "%Y-%m-%d %H:%M:%S"
-
-LOG_FORMAT: str = (
- f"{Colors.GREEN.value}%(asctime)s{Colors.END.value}"
- " | "
- f"%(level_color)s%(levelname)s{Colors.END.value}"
- " | "
- f"{Colors.CYAN.value}%(module)s:%(funcName)s:%(lineno)d{Colors.END.value}"
- " - "
- f"{Colors.PURPLE.value}[Dubbo]{Colors.END.value} "
- f"%(msg_color)s%(message)s{Colors.END.value}"
-)
-
-
-class ColorFormatter(logging.Formatter):
- """
- A formatter with color.
- It will format the log message like this:
- 2024-06-24 16:39:57 | DEBUG | test_logger_factory:test_with_config:44 -
[Dubbo] debug log
- """
-
- def __init__(self):
- self.log_format = LOG_FORMAT
- super().__init__(self.log_format, DATE_FORMAT)
-
- def format(self, record) -> str:
- levelname = record.levelname
- record.level_color = record.msg_color = LEVEL_MAP.get(levelname)
- return super().format(record)
-
-
-class NoColorFormatter(logging.Formatter):
- """
- A formatter without color.
- It will format the log message like this:
- 2024-06-24 16:39:57 | DEBUG | test_logger_factory:test_with_config:44 -
[Dubbo] debug log
- """
-
- def __init__(self):
- color_re = re.compile(r"\033\[[0-9;]*\w|%\((msg_color|level_color)\)s")
- self.log_format = color_re.sub("", LOG_FORMAT)
- super().__init__(self.log_format, DATE_FORMAT)
diff --git a/dubbo/logger/logging/logger.py b/dubbo/logger/logging/logger.py
deleted file mode 100644
index d8feb77..0000000
--- a/dubbo/logger/logging/logger.py
+++ /dev/null
@@ -1,94 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from typing import Dict
-
-from dubbo.logger import Logger
-
-from ..constants import Level
-
-__all__ = ["LoggingLogger"]
-
-# The mapping from the logging level to the logging level.
-LEVEL_MAP: Dict[Level, int] = {
- Level.DEBUG: logging.DEBUG,
- Level.INFO: logging.INFO,
- Level.WARNING: logging.WARNING,
- Level.ERROR: logging.ERROR,
- Level.CRITICAL: logging.CRITICAL,
- Level.FATAL: logging.FATAL,
-}
-
-STACKLEVEL_KEY = "stacklevel"
-STACKLEVEL_DEFAULT = 1
-STACKLEVEL_OFFSET = 2
-
-EXC_INFO_KEY = "exc_info"
-EXC_INFO_DEFAULT = True
-
-
-class LoggingLogger(Logger):
- """
- The logging logger implementation.
- """
-
- __slots__ = ["_logger"]
-
- def __init__(self, internal_logger: logging.Logger):
- """
- Initialize the logger.
- :param internal_logger: The internal logger.
- :type internal_logger: logging
- """
- self._logger = internal_logger
-
- def _log(self, level: int, msg: str, *args, **kwargs) -> None:
- # Add the stacklevel to the keyword arguments.
- kwargs[STACKLEVEL_KEY] = (
- kwargs.get(STACKLEVEL_KEY, STACKLEVEL_DEFAULT) + STACKLEVEL_OFFSET
- )
- self._logger.log(level, msg, *args, **kwargs)
-
- def log(self, level: Level, msg: str, *args, **kwargs) -> None:
- self._log(LEVEL_MAP[level], msg, *args, **kwargs)
-
- def debug(self, msg: str, *args, **kwargs) -> None:
- self._log(logging.DEBUG, msg, *args, **kwargs)
-
- def info(self, msg: str, *args, **kwargs) -> None:
- self._log(logging.INFO, msg, *args, **kwargs)
-
- def warning(self, msg: str, *args, **kwargs) -> None:
- self._log(logging.WARNING, msg, *args, **kwargs)
-
- def error(self, msg: str, *args, **kwargs) -> None:
- self._log(logging.ERROR, msg, *args, **kwargs)
-
- def critical(self, msg: str, *args, **kwargs) -> None:
- self._log(logging.CRITICAL, msg, *args, **kwargs)
-
- def fatal(self, msg: str, *args, **kwargs) -> None:
- self._log(logging.FATAL, msg, *args, **kwargs)
-
- def exception(self, msg: str, *args, **kwargs) -> None:
- if kwargs.get(EXC_INFO_KEY) is None:
- kwargs[EXC_INFO_KEY] = EXC_INFO_DEFAULT
- self.error(msg, *args, **kwargs)
-
- def is_enabled_for(self, level: Level) -> bool:
- logging_level = LEVEL_MAP.get(level)
- return self._logger.isEnabledFor(logging_level) if logging_level else
False
diff --git a/dubbo/logger/logging/logger_adapter.py
b/dubbo/logger/logging/logger_adapter.py
deleted file mode 100644
index 3e60813..0000000
--- a/dubbo/logger/logging/logger_adapter.py
+++ /dev/null
@@ -1,186 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import os
-import sys
-from functools import cache
-from logging import handlers
-
-from dubbo.common import constants as common_constants
-from dubbo.common.url import URL
-from dubbo.logger import Logger, LoggerAdapter
-from dubbo.logger import constants as logger_constants
-from dubbo.logger.constants import LEVEL_KEY, Level
-from dubbo.logger.logging import formatter
-from dubbo.logger.logging.logger import LoggingLogger
-
-"""This module provides the logging logger implementation. -> logging module"""
-
-__all__ = ["LoggingLoggerAdapter"]
-
-
-class LoggingLoggerAdapter(LoggerAdapter):
- """
- Internal logger adapter responsible for creating loggers and encapsulating
the logging.getLogger() method.
- """
-
- __slots__ = ["_level"]
-
- def __init__(self, config: URL):
- """
- Initialize the LoggingLoggerAdapter with the given configuration.
-
- :param config: The configuration URL for the logger adapter.
- :type config: URL
- """
- super().__init__(config)
- # Set level
- level_name = config.parameters.get(LEVEL_KEY)
- self._level = Level.get_level(level_name) if level_name else
Level.DEBUG
- self._update_level()
-
- def get_logger(self, name: str) -> Logger:
- """
- Create a logger instance by name.
-
- :param name: The logger name.
- :type name: str
- :return: An instance of the logger.
- :rtype: Logger
- """
- logger_instance = logging.getLogger(name)
- # clean up handlers
- logger_instance.handlers.clear()
-
- # Add console handler
- console_enabled = self._config.parameters.get(
- logger_constants.CONSOLE_ENABLED_KEY,
- str(logger_constants.DEFAULT_CONSOLE_ENABLED_VALUE),
- )
- if console_enabled.lower() == common_constants.TRUE_VALUE or bool(
- sys.stdout and sys.stdout.isatty()
- ):
- logger_instance.addHandler(self._get_console_handler())
-
- # Add file handler
- file_enabled = self._config.parameters.get(
- logger_constants.FILE_ENABLED_KEY,
- str(logger_constants.DEFAULT_FILE_ENABLED_VALUE),
- )
- if file_enabled.lower() == common_constants.TRUE_VALUE:
- logger_instance.addHandler(self._get_file_handler())
-
- if not logger_instance.handlers:
- # It's intended to be used to avoid the "No handlers could be
found for logger XXX" one-off warning.
- logger_instance.addHandler(logging.NullHandler())
-
- return LoggingLogger(logger_instance)
-
- @cache
- def _get_console_handler(self) -> logging.StreamHandler:
- """
- Get the console handler, avoiding duplicate creation with caching.
-
- :return: The console handler.
- :rtype: logging.StreamHandler
- """
- console_handler = logging.StreamHandler()
- console_handler.setFormatter(formatter.ColorFormatter())
-
- return console_handler
-
- @cache
- def _get_file_handler(self) -> logging.Handler:
- """
- Get the file handler, avoiding duplicate creation with caching.
-
- :return: The file handler.
- :rtype: logging.Handler
- """
- # Get file path
- file_dir = self._config.parameters.get(logger_constants.FILE_DIR_KEY)
- file_name = self._config.parameters.get(
- logger_constants.FILE_NAME_KEY,
logger_constants.DEFAULT_FILE_NAME_VALUE
- )
- file_path = os.path.join(file_dir, file_name)
- # Get backup count
- backup_count = int(
- self._config.parameters.get(
- logger_constants.FILE_BACKUP_COUNT_KEY,
- logger_constants.DEFAULT_FILE_BACKUP_COUNT_VALUE,
- )
- )
- # Get rotate type
- rotate_type =
self._config.parameters.get(logger_constants.FILE_ROTATE_KEY)
-
- # Set file Handler
- file_handler: logging.Handler
- if rotate_type == logger_constants.FileRotateType.SIZE.value:
- # Set RotatingFileHandler
- max_bytes = int(
-
self._config.parameters.get(logger_constants.FILE_MAX_BYTES_KEY)
- )
- file_handler = handlers.RotatingFileHandler(
- file_path, maxBytes=max_bytes, backupCount=backup_count
- )
- elif rotate_type == logger_constants.FileRotateType.TIME.value:
- # Set TimedRotatingFileHandler
- interval = int(
- self._config.parameters.get(logger_constants.FILE_INTERVAL_KEY)
- )
- file_handler = handlers.TimedRotatingFileHandler(
- file_path, interval=interval, backupCount=backup_count
- )
- else:
- # Set FileHandler
- file_handler = logging.FileHandler(file_path)
-
- # Add file_handler
- file_handler.setFormatter(formatter.NoColorFormatter())
- return file_handler
-
- @property
- def level(self) -> Level:
- """
- Get the logging level.
-
- :return: The current logging level.
- :rtype: Level
- """
- return self._level
-
- @level.setter
- def level(self, level: Level) -> None:
- """
- Set the logging level.
-
- :param level: The logging level to set.
- :type level: Level
- """
- if level == self._level or level is None:
- return
- self._level = level
- self._update_level()
-
- def _update_level(self):
- """
- Update the log level by modifying the root logger.
- """
- # Get the root logger
- root_logger = logging.getLogger()
- # Set the logging level
- root_logger.setLevel(self._level.value)
diff --git a/dubbo/protocol/triple/invoker.py b/dubbo/protocol/triple/invoker.py
index 95c6147..de93e94 100644
--- a/dubbo/protocol/triple/invoker.py
+++ b/dubbo/protocol/triple/invoker.py
@@ -47,7 +47,7 @@ class TripleInvoker(Invoker):
Triple invoker.
"""
- __slots__ = ["_url", "_client", "_stream_multiplexer", "_compression",
"_destroyed"]
+ __slots__ = ["_url", "_client", "_stream_multiplexer", "_compression"]
def __init__(
self, url: URL, client: Client, stream_multiplexer:
StreamClientMultiplexHandler
@@ -56,8 +56,6 @@ class TripleInvoker(Invoker):
self._client = client
self._stream_multiplexer = stream_multiplexer
- self._destroyed = False
-
def invoke(self, invocation: RpcInvocation) -> Result:
call_type: CallType =
invocation.get_attribute(common_constants.CALL_KEY)
result = TriResult(call_type)
@@ -202,10 +200,6 @@ class TripleInvoker(Invoker):
def is_available(self) -> bool:
return self._client.is_connected()
- @property
- def destroyed(self) -> bool:
- return self._destroyed
-
def destroy(self) -> None:
self._client.close()
self._client = None
diff --git a/dubbo/protocol/triple/protocol.py
b/dubbo/protocol/triple/protocol.py
index 102b552..b9896c2 100644
--- a/dubbo/protocol/triple/protocol.py
+++ b/dubbo/protocol/triple/protocol.py
@@ -61,11 +61,13 @@ class TripleProtocol(Protocol):
if self._server is not None:
return
- service_handler: RpcServiceHandler = url.attributes[
- common_constants.SERVICE_HANDLER_KEY
- ]
+ service_handler = url.attributes[common_constants.SERVICE_HANDLER_KEY]
- self._path_resolver[service_handler.service_name] = service_handler
+ if iter(service_handler):
+ for handler in service_handler:
+ self._path_resolver[handler.service_name] = handler
+ else:
+ self._path_resolver[service_handler.service_name] = service_handler
method_executor = ThreadPoolExecutor(
thread_name_prefix=f"dubbo_tri_method_{str(uuid.uuid4())}",
max_workers=10
diff --git a/dubbo/registry/protocol.py b/dubbo/registry/protocol.py
index 2a13764..ebee40f 100644
--- a/dubbo/registry/protocol.py
+++ b/dubbo/registry/protocol.py
@@ -14,9 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from dubbo.cluster import Directory
from dubbo.cluster.directories import RegistryDirectory
from dubbo.cluster.failfast_cluster import FailfastCluster
+from dubbo.cluster.monitor.cpu import CpuMonitor, CpuInnerRpcHandler
from dubbo.configs import RegistryConfig
from dubbo.constants import common_constants
from dubbo.extension import extensionLoader
@@ -44,8 +44,15 @@ class RegistryProtocol(Protocol):
# get the server registry
registry = self._factory.get_registry(url)
- ref_url = url.attributes[common_constants.EXPORT_KEY]
+ ref_url: URL = url.attributes[common_constants.EXPORT_KEY]
registry.register(ref_url)
+
+ # add cpu handler
+ ref_url.attributes[common_constants.SERVICE_HANDLER_KEY] = [
+ ref_url.attributes[common_constants.SERVICE_HANDLER_KEY],
+ CpuInnerRpcHandler.get_service_handler(),
+ ]
+
# continue the export process
self._protocol.export(ref_url)
@@ -53,7 +60,10 @@ class RegistryProtocol(Protocol):
registry = self._factory.get_registry(url)
# create the directory
- directory: Directory = RegistryDirectory(registry, self._protocol, url)
+ if url.parameters.get(common_constants.LOADBALANCE_KEY) == "cpu":
+ directory = CpuMonitor(registry, self._protocol, url)
+ else:
+ directory = RegistryDirectory(registry, self._protocol, url)
# continue the refer process
return FailfastCluster().join(directory)
diff --git a/dubbo/remoting/aio/http2/controllers.py
b/dubbo/remoting/aio/http2/controllers.py
index 6642ecf..d52e2df 100644
--- a/dubbo/remoting/aio/http2/controllers.py
+++ b/dubbo/remoting/aio/http2/controllers.py
@@ -349,6 +349,12 @@ class FrameOutboundController(Controller):
"""
def _inner_operation(_frame: UserActionFrames):
+
+ # -1 means the stream is not created, so we don't need to send the
reset frame
+ if self._stream.id == -1:
+ return
+
+ _frame.stream_id = self._stream.id
self._protocol.send_frame(_frame, self._stream)
self._stream.close_local()
@@ -376,6 +382,7 @@ class FrameOutboundController(Controller):
# wait and send the data frames
while True:
frame = await self._data_queue.get()
+ frame.stream_id = self._stream.id
if frame is not FrameOutboundController.LAST_DATA_FRAME:
self._data_sent_event = asyncio.Event()
self._protocol.send_frame(frame, self._stream,
self._data_sent_event)
@@ -388,6 +395,7 @@ class FrameOutboundController(Controller):
# wait for the last data frame and send the trailers frame
await self._data_sent_event.wait()
+ self._trailers.stream_id = self._stream.id
self._protocol.send_frame(self._trailers, self._stream)
# close the stream
diff --git a/dubbo/remoting/aio/http2/frames.py
b/dubbo/remoting/aio/http2/frames.py
index 8809f8d..b216b4d 100644
--- a/dubbo/remoting/aio/http2/frames.py
+++ b/dubbo/remoting/aio/http2/frames.py
@@ -24,7 +24,7 @@ __all__ = [
"HeadersFrame",
"DataFrame",
"WindowUpdateFrame",
- "ResetStreamFrame",
+ "RstStreamFrame",
"PingFrame",
"UserActionFrames",
]
@@ -147,7 +147,7 @@ class WindowUpdateFrame(Http2Frame):
return f"<WindowUpdateFrame stream_id={self.stream_id}
delta={self.delta}>"
-class ResetStreamFrame(Http2Frame):
+class RstStreamFrame(Http2Frame):
"""
HTTP/2 reset stream frame.
"""
@@ -170,7 +170,9 @@ class ResetStreamFrame(Http2Frame):
self.error_code = error_code
def __repr__(self) -> str:
- return f"<ResetStreamFrame stream_id={self.stream_id}
error_code={self.error_code}>"
+ return (
+ f"<RstStreamFrame stream_id={self.stream_id}
error_code={self.error_code}>"
+ )
class PingFrame(Http2Frame):
@@ -178,9 +180,9 @@ class PingFrame(Http2Frame):
HTTP/2 ping frame.
"""
- __slots__ = ["data"]
+ __slots__ = ["data", "ack"]
- def __init__(self, data: bytes):
+ def __init__(self, data: bytes, ack: bool = False):
"""
Initialize the HTTP/2 ping frame.
:param data: The data.
@@ -188,10 +190,11 @@ class PingFrame(Http2Frame):
"""
super().__init__(0, Http2FrameType.PING, False)
self.data = data
+ self.ack = ack
def __repr__(self) -> str:
return f"<PingFrame data={self.data}>"
# User action frames.
-UserActionFrames = Union[HeadersFrame, DataFrame, ResetStreamFrame]
+UserActionFrames = Union[HeadersFrame, DataFrame, RstStreamFrame]
diff --git a/dubbo/remoting/aio/http2/protocol.py
b/dubbo/remoting/aio/http2/protocol.py
index fa96523..89ff26f 100644
--- a/dubbo/remoting/aio/http2/protocol.py
+++ b/dubbo/remoting/aio/http2/protocol.py
@@ -17,7 +17,7 @@ import abc
import asyncio
import struct
import time
-from typing import List, Optional, Tuple
+from typing import Optional
from h2.config import H2Configuration
from h2.connection import H2Connection
@@ -32,7 +32,7 @@ from dubbo.remoting.aio.http2.frames import (
HeadersFrame,
Http2Frame,
PingFrame,
- ResetStreamFrame,
+ RstStreamFrame,
UserActionFrames,
WindowUpdateFrame,
)
@@ -159,9 +159,7 @@ class AbstractHttp2Protocol(asyncio.Protocol, abc.ABC):
"""
frame_type = frame.frame_type
if frame_type == Http2FrameType.HEADERS:
- self._send_headers_frame(
- frame.stream_id, frame.headers.to_list(), frame.end_stream,
event
- )
+ self._send_headers_frame(frame, stream, event)
elif frame_type == Http2FrameType.DATA:
self._flow_controller.write_data(stream, frame, event)
elif frame_type == Http2FrameType.RST_STREAM:
@@ -171,22 +169,25 @@ class AbstractHttp2Protocol(asyncio.Protocol, abc.ABC):
def _send_headers_frame(
self,
- stream_id: int,
- headers: List[Tuple[str, str]],
- end_stream: bool,
+ frame: HeadersFrame,
+ stream: Http2Stream,
event: Optional[asyncio.Event] = None,
) -> None:
"""
Send the HTTP/2 headers frame.(thread-unsafe)
- :param stream_id: The stream identifier.
- :type stream_id: int
- :param headers: The headers.
- :type headers: List[Tuple[str, str]]
- :param end_stream: Whether the stream is ended.
- :type end_stream: bool
+ :param frame: The frame to send.
+ :type frame: HeadersFrame
+ :param stream: The stream.
+ :type stream: Http2Stream
:param event: The event to be set after sending the frame.
"""
- self._h2_connection.send_headers(stream_id, headers,
end_stream=end_stream)
+ if stream.id == -1:
+ stream.id = self._h2_connection.get_next_available_stream_id()
+ self._stream_handler.put_stream(stream.id, stream)
+
+ self._h2_connection.send_headers(
+ stream.id, frame.headers.to_list(), end_stream=frame.end_stream
+ )
self._flush()
EventHelper.set(event)
@@ -248,7 +249,7 @@ class AbstractHttp2Protocol(asyncio.Protocol, abc.ABC):
if isinstance(frame, WindowUpdateFrame):
# Because flow control may be at the connection level,
it is handled here
self._flow_controller.release_flow_control(frame)
- elif isinstance(frame, (HeadersFrame, DataFrame,
ResetStreamFrame)):
+ elif isinstance(frame, (HeadersFrame, DataFrame,
RstStreamFrame)):
# Handle the frame by the stream handler
self._stream_handler.handle_frame(frame)
else:
@@ -331,7 +332,7 @@ class Http2ClientProtocol(AbstractHttp2Protocol):
def _do_other_frame(self, frame: Http2Frame):
# Handle the ping frame
- if isinstance(frame, PingFrame):
+ if isinstance(frame, PingFrame) and frame.ack:
FutureHelper.set_result(self._ping_ack_future, None)
async def _heartbeat_loop(self):
diff --git a/dubbo/remoting/aio/http2/stream.py
b/dubbo/remoting/aio/http2/stream.py
index e610d7c..c072a95 100644
--- a/dubbo/remoting/aio/http2/stream.py
+++ b/dubbo/remoting/aio/http2/stream.py
@@ -23,7 +23,7 @@ from dubbo.remoting.aio.exceptions import StreamError
from dubbo.remoting.aio.http2.frames import (
DataFrame,
HeadersFrame,
- ResetStreamFrame,
+ RstStreamFrame,
UserActionFrames,
)
from dubbo.remoting.aio.http2.headers import Http2Headers
@@ -58,6 +58,13 @@ class Http2Stream(abc.ABC):
"""
return self._id
+ @id.setter
+ def id(self, stream_id: int) -> None:
+ """
+ Set the stream identifier.
+ """
+ self._id = stream_id
+
@property
def listener(self) -> "Http2Stream.Listener":
"""
@@ -261,8 +268,8 @@ class DefaultHttp2Stream(Http2Stream):
if self.local_closed:
# The stream has been closed locally.
return
- reset_frame = ResetStreamFrame(self.id, error_code)
- self._outbound_controller.write_rst(reset_frame)
+ rst_frame = RstStreamFrame(self.id, error_code)
+ self._outbound_controller.write_rst(rst_frame)
def receive_frame(self, frame: UserActionFrames) -> None:
"""
diff --git a/dubbo/remoting/aio/http2/stream_handler.py
b/dubbo/remoting/aio/http2/stream_handler.py
index 65ec7bd..73d2d4f 100644
--- a/dubbo/remoting/aio/http2/stream_handler.py
+++ b/dubbo/remoting/aio/http2/stream_handler.py
@@ -16,12 +16,10 @@
import asyncio
import uuid
-from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Dict, Optional
from dubbo.loggers import loggerFactory
-from dubbo.remoting.aio.exceptions import ProtocolError
from dubbo.remoting.aio.http2.frames import UserActionFrames
from dubbo.remoting.aio.http2.registries import Http2FrameType
from dubbo.remoting.aio.http2.stream import DefaultHttp2Stream, Http2Stream
@@ -107,6 +105,9 @@ class StreamMultiplexHandler:
# It must be ensured that the event loop is not blocked,
# and if there is a blocking operation, the executor must be used.
stream.receive_frame(frame)
+
+ if frame.end_stream and stream.local_closed:
+ self.remove_stream(frame.stream_id)
else:
_LOGGER.warning(
f"Stream {frame.stream_id} not found. Ignoring frame {frame}"
@@ -134,19 +135,9 @@ class StreamClientMultiplexHandler(StreamMultiplexHandler):
:return: The stream.
:rtype: DefaultHttp2Stream
"""
- future = futures.Future()
- self._protocol.get_next_stream_id(future)
- try:
- # block until the stream_id is created
- stream_id = future.result()
- new_stream = DefaultHttp2Stream(
- stream_id, listener, self._loop, self._protocol, self._executor
- )
- self.put_stream(stream_id, new_stream)
- except Exception as e:
- raise ProtocolError("Failed to create stream.") from e
-
- return new_stream
+ return DefaultHttp2Stream(
+ -1, listener, self._loop, self._protocol, self._executor
+ )
class StreamServerMultiplexHandler(StreamMultiplexHandler):
diff --git a/dubbo/remoting/aio/http2/utils.py
b/dubbo/remoting/aio/http2/utils.py
index 7cc4f66..32d52f0 100644
--- a/dubbo/remoting/aio/http2/utils.py
+++ b/dubbo/remoting/aio/http2/utils.py
@@ -22,7 +22,7 @@ from dubbo.remoting.aio.http2.frames import (
DataFrame,
HeadersFrame,
PingFrame,
- ResetStreamFrame,
+ RstStreamFrame,
WindowUpdateFrame,
)
from dubbo.remoting.aio.http2.headers import Http2Headers
@@ -40,14 +40,14 @@ class Http2EventUtils:
def convert_to_frame(
event: h2_event.Event,
) -> Union[
- HeadersFrame, DataFrame, ResetStreamFrame, WindowUpdateFrame,
PingFrame, None
+ HeadersFrame, DataFrame, RstStreamFrame, WindowUpdateFrame, PingFrame,
None
]:
"""
Convert a h2.events.Event to HTTP/2 Frame.
:param event: The H2 event.
:type event: h2.events.Event
:return: The HTTP/2 frame.
- :rtype: Union[HeadersFrame, DataFrame, ResetStreamFrame,
WindowUpdateFrame, PingFrame, None]
+ :rtype: Union[HeadersFrame, DataFrame, RstStreamFrame,
WindowUpdateFrame, PingFrame, None]
"""
if isinstance(
event,
@@ -73,14 +73,14 @@ class Http2EventUtils:
)
elif isinstance(event, h2_event.StreamReset):
# RST_STREAM frame.
- return ResetStreamFrame(
- event.stream_id, Http2ErrorCode.get(event.error_code)
- )
+ return RstStreamFrame(event.stream_id,
Http2ErrorCode.get(event.error_code))
elif isinstance(event, h2_event.WindowUpdated):
# WINDOW_UPDATE frame.
return WindowUpdateFrame(event.stream_id, event.delta)
- elif isinstance(event, h2_event.PingReceived):
+ elif isinstance(event, (h2_event.PingAckReceived,
h2_event.PingReceived)):
# PING frame.
- return PingFrame(event.ping_data)
+ return PingFrame(
+ event.ping_data, ack=isinstance(event,
h2_event.PingAckReceived)
+ )
return None
diff --git a/dubbo/utils.py b/dubbo/utils.py
index 47b404f..6900237 100644
--- a/dubbo/utils.py
+++ b/dubbo/utils.py
@@ -16,7 +16,11 @@
import socket
-__all__ = ["EventHelper", "FutureHelper", "NetworkUtils"]
+__all__ = ["EventHelper", "FutureHelper", "NetworkUtils", "CpuUtils"]
+
+from typing import List, Tuple
+
+import psutil
class EventHelper:
@@ -155,3 +159,71 @@ class NetworkUtils:
:rtype: str
"""
return socket.gethostbyname(NetworkUtils.get_host_name())
+
+
+class CpuUtils:
+ """
+ Helper class for CPU operations.
+ """
+
+ @staticmethod
+ def get_cpu_count(logical=True) -> int:
+ """
+ Get the number of CPUs in the system.
+
+ :return: The number of CPUs in the system.
+ :rtype: int
+ """
+ return psutil.cpu_count(logical=logical)
+
+ @staticmethod
+ def get_total_cpu_usage(interval=1) -> float:
+ """
+ Get the total CPU usage of the system.
+
+ :param interval: The interval in seconds.
+ :type interval: int
+ :return: The total CPU usage of the system.
+ :rtype: float
+ """
+ return psutil.cpu_percent(interval=interval)
+
+ @staticmethod
+ def get_per_cpu_usage(interval=1) -> List[float]:
+ """
+ Get the per CPU usage of the system.
+
+ :param interval: The interval in seconds.
+ :type interval: int
+ :return: The per CPU usage of the system.
+ :rtype: list
+ """
+ return psutil.cpu_percent(interval=interval, percpu=True)
+
+ @staticmethod
+ def get_load_avg() -> Tuple[float, float, float]:
+ """
+ Get the load average over the last 1, 5, and 15 minutes
+
+ :return: The load average of the system.
+ :rtype: list
+ """
+ return psutil.getloadavg()
+
+ @staticmethod
+ def get_cpu_stats():
+ """
+ Get the CPU stats of the system.
+
+ :return: The CPU stats of the system.
+ """
+ return psutil.cpu_stats()
+
+ @staticmethod
+ def get_cpu_freq():
+ """
+ Get the current CPU frequency.
+
+ :return: The current CPU frequency.
+ """
+ return psutil.cpu_freq()
diff --git a/requirements.txt b/requirements.txt
index dd0cffb..89dbdc6 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,4 @@
h2>=4.1.0
uvloop>=0.19.0
-kazoo>=2.10.0
\ No newline at end of file
+kazoo>=2.10.0
+psutil>=6.0.0
\ No newline at end of file
diff --git a/samples/registry/zookeeper/client.py
b/samples/registry/zookeeper/client.py
index 9c84db0..ff81ba7 100644
--- a/samples/registry/zookeeper/client.py
+++ b/samples/registry/zookeeper/client.py
@@ -13,6 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import time
+
import unary_unary_pb2
import dubbo
@@ -33,10 +35,14 @@ class UnaryServiceStub:
if __name__ == "__main__":
- registry_config = RegistryConfig.from_url("zookeeper://127.0.0.1:2181")
+ registry_config = RegistryConfig.from_url(
+ "zookeeper://127.0.0.1:2181"
+ )
bootstrap = dubbo.Dubbo(registry_config=registry_config)
- reference_config = ReferenceConfig(protocol="tri",
service="org.apache.dubbo.samples.registry.zk")
+ reference_config = ReferenceConfig(
+ protocol="tri", service="org.apache.dubbo.samples.registry.zk"
+ )
dubbo_client = bootstrap.create_client(reference_config)
unary_service_stub = UnaryServiceStub(dubbo_client)
diff --git a/samples/stream/server_stream/client.py
b/samples/stream/server_stream/client.py
index fa9d4c1..4b9cb39 100644
--- a/samples/stream/server_stream/client.py
+++ b/samples/stream/server_stream/client.py
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unary_stream_pb2
-from setuptools.extern import names
import dubbo
from dubbo.configs import ReferenceConfig
diff --git a/setup.py b/setup.py
index edb5703..4b91932 100644
--- a/setup.py
+++ b/setup.py
@@ -56,6 +56,10 @@ setup(
packages=find_packages(include=("dubbo", "dubbo.*")),
test_suite="tests",
python_requires=">=3.11",
- install_requires=["h2>=4.1.0", "uvloop>=0.19.0;
platform_system!='Windows'"],
+ install_requires=[
+ "h2>=4.1.0",
+ "uvloop>=0.19.0; platform_system!='Windows'",
+ "psutil>=6.0.0",
+ ],
extras_require={"zookeeper": ["kazoo>=2.10.0"]},
)