This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 2210d84  Correct logic in message delivery and directory management 
(#42)
2210d84 is described below

commit 2210d84f2627ee5c14084f3e715737e62511cf75
Author: Advait <[email protected]>
AuthorDate: Tue Jan 21 17:03:28 2025 +0530

    Correct logic in message delivery and directory management (#42)
    
    * Logical Error Fixed in delivererers.py inside src and 
cluster/directories.py
    
    * The Deliverers Module is deprecated and not used by any other module; 
furthermore, the documentation does not mention this module
---
 src/dubbo/cluster/directories.py |   2 +-
 src/dubbo/deliverers.py          | 310 ---------------------------------------
 2 files changed, 1 insertion(+), 311 deletions(-)

diff --git a/src/dubbo/cluster/directories.py b/src/dubbo/cluster/directories.py
index 866ce8b..cb13a33 100644
--- a/src/dubbo/cluster/directories.py
+++ b/src/dubbo/cluster/directories.py
@@ -46,7 +46,7 @@ class RegistryDirectory(Directory, NotifyListener):
         # create new invokers
         for url in urls:
             k = str(url)
-            if k in old_invokers.items():
+            if k in old_invokers:
                 self._invokers[k] = old_invokers[k]
                 del old_invokers[k]
             else:
diff --git a/src/dubbo/deliverers.py b/src/dubbo/deliverers.py
deleted file mode 100644
index fa962f0..0000000
--- a/src/dubbo/deliverers.py
+++ /dev/null
@@ -1,310 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import abc
-import enum
-import queue
-import threading
-from typing import Any, Optional
-
-__all__ = ["MessageDeliverer", "SingleMessageDeliverer", 
"MultiMessageDeliverer"]
-
-
-class DelivererStatus(enum.Enum):
-    """
-    Enumeration for deliverer status.
-
-    Possible statuses:
-        - PENDING: The deliverer is pending action.
-        - COMPLETED: The deliverer has completed the action.
-        - CANCELLED: The action for the deliverer has been cancelled.
-        - FINISHED: The deliverer has finished all actions and is in a final 
state.
-    """
-
-    PENDING = 0
-    COMPLETED = 1
-    CANCELLED = 2
-    FINISHED = 3
-
-    @classmethod
-    def change_allowed(cls, current_status: "DelivererStatus", target_status: 
"DelivererStatus") -> bool:
-        """
-        Check if a transition from `current_status` to `target_status` is 
allowed.
-
-        :param current_status: The current status of the deliverer.
-        :type current_status: DelivererStatus
-        :param target_status: The target status to transition to.
-        :type target_status: DelivererStatus
-        :return: A boolean indicating if the transition is allowed.
-        :rtype: bool
-        """
-        # PENDING -> COMPLETED or CANCELLED
-        if current_status == cls.PENDING:
-            return target_status in {cls.COMPLETED, cls.CANCELLED}
-
-        # COMPLETED -> FINISHED or CANCELLED
-        elif current_status == cls.COMPLETED:
-            return target_status in {cls.FINISHED, cls.CANCELLED}
-
-        # CANCELLED -> FINISHED
-        elif current_status == cls.CANCELLED:
-            return target_status == cls.FINISHED
-
-        # FINISHED is the final state, no further transitions allowed
-        else:
-            return False
-
-
-class NoMoreMessageError(RuntimeError):
-    """
-    Exception raised when no more messages are available.
-    """
-
-    def __init__(self, message: str = "No more message"):
-        super().__init__(message)
-
-
-class EmptyMessageError(RuntimeError):
-    """
-    Exception raised when the message is empty.
-    """
-
-    def __init__(self, message: str = "Message is empty"):
-        super().__init__(message)
-
-
-class MessageDeliverer(abc.ABC):
-    """
-    Abstract base class for message deliverers.
-    """
-
-    __slots__ = ["_status"]
-
-    def __init__(self):
-        self._status = DelivererStatus.PENDING
-
-    @abc.abstractmethod
-    def add(self, message: Any) -> None:
-        """
-        Add a message to the deliverer.
-
-        :param message: The message to be added.
-        :type message: Any
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def complete(self, message: Any = None) -> None:
-        """
-        Mark the message delivery as complete.
-
-        :param message: The last message (optional).
-        :type message: Any, optional
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def cancel(self, exc: Optional[Exception]) -> None:
-        """
-        Cancel the message delivery.
-
-        :param exc: The exception that caused the cancellation.
-        :type exc: Exception, optional
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def get(self) -> Any:
-        """
-        Get the next message.
-
-        :return: The next message.
-        :rtype: Any
-        :raises NoMoreMessageError: If no more messages are available.
-        :raises Exception: If the message delivery is cancelled.
-        """
-        raise NotImplementedError()
-
-    @abc.abstractmethod
-    def get_nowait(self) -> Any:
-        """
-        Get the next message without waiting.
-
-        :return: The next message.
-        :rtype: Any
-        :raises EmptyMessageError: If the message is empty.
-        :raises NoMoreMessageError: If no more messages are available.
-        :raises Exception: If the message delivery is cancelled.
-        """
-        raise NotImplementedError()
-
-
-class SingleMessageDeliverer(MessageDeliverer):
-    """
-    Message deliverer for a single message using a signal-based approach.
-    """
-
-    __slots__ = ["_condition", "_message"]
-
-    def __init__(self):
-        super().__init__()
-        self._condition = threading.Condition()
-        self._message: Any = None
-
-    def add(self, message: Any) -> None:
-        with self._condition:
-            if self._status is DelivererStatus.PENDING:
-                # Add the message
-                self._message = message
-
-    def complete(self, message: Any = None) -> None:
-        with self._condition:
-            if DelivererStatus.change_allowed(self._status, 
DelivererStatus.COMPLETED):
-                if message is not None:
-                    self._message = message
-                # update the status
-                self._status = DelivererStatus.COMPLETED
-                self._condition.notify_all()
-
-    def cancel(self, exc: Optional[Exception]) -> None:
-        with self._condition:
-            if DelivererStatus.change_allowed(self._status, 
DelivererStatus.CANCELLED):
-                # Cancel the delivery
-                self._message = exc or RuntimeError("delivery cancelled.")
-                self._status = DelivererStatus.CANCELLED
-                self._condition.notify_all()
-
-    def get(self) -> Any:
-        with self._condition:
-            if self._status is DelivererStatus.FINISHED:
-                raise NoMoreMessageError("Message already consumed.")
-
-            if self._status is DelivererStatus.PENDING:
-                # If the message is not available, wait
-                self._condition.wait()
-
-            # check the status
-            if self._status is DelivererStatus.CANCELLED:
-                raise self._message
-
-            self._status = DelivererStatus.FINISHED
-            return self._message
-
-    def get_nowait(self) -> Any:
-        with self._condition:
-            if self._status is DelivererStatus.FINISHED:
-                self._status = DelivererStatus.PENDING
-                return self._message
-
-            # raise error
-            if self._status is DelivererStatus.FINISHED:
-                raise NoMoreMessageError("Message already consumed.")
-            elif self._status is DelivererStatus.CANCELLED:
-                raise self._message
-            elif self._status is DelivererStatus.PENDING:
-                raise EmptyMessageError("Message is empty")
-
-
-class MultiMessageDeliverer(MessageDeliverer):
-    """
-    Message deliverer supporting multiple messages.
-    """
-
-    __slots__ = ["_lock", "_counter", "_messages", "_END_SENTINEL"]
-
-    def __init__(self):
-        super().__init__()
-        self._lock = threading.Lock()
-        self._counter = 0
-        self._messages: queue.PriorityQueue[Any] = queue.PriorityQueue()
-        self._END_SENTINEL = object()
-
-    def add(self, message: Any) -> None:
-        with self._lock:
-            if self._status is DelivererStatus.PENDING:
-                # Add the message
-                self._counter += 1
-                self._messages.put_nowait((self._counter, message))
-
-    def complete(self, message: Any = None) -> None:
-        with self._lock:
-            if DelivererStatus.change_allowed(self._status, 
DelivererStatus.COMPLETED):
-                if message is not None:
-                    self._counter += 1
-                    self._messages.put_nowait((self._counter, message))
-
-                # Add the end sentinel
-                self._counter += 1
-                self._messages.put_nowait((self._counter, self._END_SENTINEL))
-                self._status = DelivererStatus.COMPLETED
-
-    def cancel(self, exc: Optional[Exception]) -> None:
-        with self._lock:
-            if DelivererStatus.change_allowed(self._status, 
DelivererStatus.CANCELLED):
-                # Set the priority to -1 -> make sure it is the first message
-                self._messages.put_nowait((-1, exc or RuntimeError("delivery 
cancelled.")))
-                self._status = DelivererStatus.CANCELLED
-
-    def get(self) -> Any:
-        if self._status is DelivererStatus.FINISHED:
-            raise NoMoreMessageError("No more message")
-
-        # block until the message is available
-        priority, message = self._messages.get()
-
-        # check the status
-        if self._status is DelivererStatus.CANCELLED:
-            raise message
-        elif message is self._END_SENTINEL:
-            self._status = DelivererStatus.FINISHED
-            raise NoMoreMessageError("No more message")
-        else:
-            return message
-
-    def get_nowait(self) -> Any:
-        try:
-            if self._status is DelivererStatus.FINISHED:
-                raise NoMoreMessageError("No more message")
-
-            priority, message = self._messages.get_nowait()
-
-            # check the status
-            if self._status is DelivererStatus.CANCELLED:
-                raise message
-            elif message is self._END_SENTINEL:
-                self._status = DelivererStatus.FINISHED
-                raise NoMoreMessageError("No more message")
-            else:
-                return message
-        except queue.Empty:
-            raise EmptyMessageError("Message is empty")
-
-    def __iter__(self):
-        return self
-
-    def __next__(self):
-        """
-        Returns the next request from the queue.
-
-        :return: The next message.
-        :rtype: Any
-        :raises StopIteration: If no more messages are available.
-        """
-        while True:
-            try:
-                return self.get()
-            except NoMoreMessageError:
-                raise StopIteration

Reply via email to