BewareMyPower commented on code in PR #277:
URL: 
https://github.com/apache/pulsar-client-python/pull/277#discussion_r2643310172


##########
pulsar/asyncio.py:
##########
@@ -116,6 +128,177 @@ async def close(self) -> None:
         self._producer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 
+class Consumer:
+    """
+    The Pulsar message consumer, used to subscribe to messages from a topic.
+    """
+
+    def __init__(self, consumer: _pulsar.Consumer, schema: 
pulsar.schema.Schema) -> None:
+        """
+        Create the consumer.
+        Users should not call this constructor directly. Instead, create the
+        consumer via `Client.subscribe`.
+
+        Parameters
+        ----------
+        consumer: _pulsar.Consumer
+            The underlying Consumer object from the C extension.
+        schema: pulsar.schema.Schema
+            The schema of the data that will be received by this consumer.
+        """
+        self._consumer = consumer
+        self._schema = schema
+
+    async def receive(self) -> pulsar.Message:
+        """
+        Receive a single message asynchronously.
+
+        Returns
+        -------
+        pulsar.Message
+            The message received.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.receive_async(functools.partial(_set_future, future))
+        msg = await future
+        m = pulsar.Message()
+        m._message = msg
+        m._schema = self._schema
+        return m
+
+    async def acknowledge(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of a single message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_async(msg, functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def acknowledge_cumulative(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of all the messages in the stream up to (and
+        including) the provided message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_cumulative_async(
+            msg, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def unsubscribe(self) -> None:
+        """
+        Unsubscribe the current consumer from the topic asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.unsubscribe_async(functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
+        """
+        Reset the subscription associated with this consumer to a specific
+        message id or publish timestamp asynchronously.
+
+        The message id can either be a specific message or represent the first
+        or last messages in the topic.
+
+        Parameters
+        ----------
+        messageid : MessageId or int
+            The message id for seek, OR an integer event time (timestamp) to
+            seek to
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(messageid, pulsar.MessageId):
+            msg_id = messageid._msg_id
+        elif isinstance(messageid, int):
+            msg_id = messageid
+        else:
+            raise ValueError(f"invalid messageid type {type(messageid)}")
+        self._consumer.seek_async(
+            msg_id, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def close(self) -> None:
+        """
+        Close the consumer asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.close_async(functools.partial(_set_future, future, 
value=None))
+        await future
+
+    def topic(self) -> str:
+        """
+        Return the topic this consumer is subscribed to.
+        """
+        return self._consumer.topic()
+
+    def subscription_name(self) -> str:
+        """
+        Return the subscription name.
+        """
+        return self._consumer.subscription_name()
+
+    def consumer_name(self) -> str:
+        """
+        Return the consumer name.
+        """
+        return self._consumer.consumer_name()
+

Review Comment:
   `is_connected` is a bad design, other methods can be implemented in future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to