This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 75a57b4  Improve apidoc with pydoctor tool  (#28)
75a57b4 is described below

commit 75a57b427d4c6944c49f4b712344107b5444aa36
Author: tison <[email protected]>
AuthorDate: Sun Nov 6 15:57:22 2022 +0800

    Improve apidoc with pydoctor tool  (#28)
    
    Using the [numpy doc 
style](https://numpydoc.readthedocs.io/en/latest/format.html). This format is 
also compatible with `pdoc` by specifying `--docformat=numpy`.
---
 README.md                    |  14 +
 pulsar/__init__.py           | 916 ++++++++++++++++++++++---------------------
 pulsar/functions/context.py  | 301 +++++++-------
 pulsar/functions/function.py |  22 +-
 pulsar/functions/serde.py    |  83 ++--
 5 files changed, 686 insertions(+), 650 deletions(-)

diff --git a/README.md b/README.md
index b0857a0..c25c98f 100644
--- a/README.md
+++ b/README.md
@@ -110,3 +110,17 @@ Run a single unit test (e.g. `PulsarTest.test_tls_auth`):
 ```bash
 python3 ./tests/pulsar_test.py 'PulsarTest.test_tls_auth'
 ```
+
+## Generate API docs
+
+Pulsar Python Client uses [pydoctor](https://github.com/twisted/pydoctor) to 
generate API docs. To generate by yourself, run the following command in the 
root path of this repository:
+
+```bash
+pip3 install pydoctor
+pydoctor --make-html \
+  
--html-viewsource-base=https://github.com/apache/pulsar-client-python/tree/<release-version-tag>
 \
+  --docformat=numpy --theme=readthedocs \
+  --intersphinx=https://docs.python.org/3/objects.inv \
+  --html-output=<path-to-apidocs> \
+  pulsar
+```
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 6fd38d1..c1195de 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -23,81 +23,23 @@ All the same features are exposed through the Python 
interface.
 
 Currently, the supported Python versions are 3.7, 3.8, 3.9 and 3.10.
 
-## Install from PyPI
+=================
+Install from PyPI
+=================
 
-Download Python wheel binary files for MacOS and Linux
-directly from the PyPI archive.
+Download Python wheel binary files for macOS and Linux directly from
+the PyPI archive:
 
-    #!shell
-    $ sudo pip install pulsar-client
+.. code-block:: shell
 
-## Install from sources
+    sudo pip install pulsar-client
 
-Follow the instructions to compile the Pulsar C++ client library. This method
-will also build the Python binding for the library.
+========================
+Install from source code
+========================
 
-To install the Python bindings:
-
-    #!shell
-    $ cd pulsar-client-cpp/python
-    $ sudo python setup.py install
-
-## Examples
-
-### [Producer](#pulsar.Producer) example
-
-    #!python
-    import pulsar
-
-    client = pulsar.Client('pulsar://localhost:6650')
-
-    producer = client.create_producer('my-topic')
-
-    for i in range(10):
-        producer.send(('Hello-%d' % i).encode('utf-8'))
-
-    client.close()
-
-#### [Consumer](#pulsar.Consumer) Example
-
-    #!python
-    import pulsar
-
-    client = pulsar.Client('pulsar://localhost:6650')
-
-    consumer = client.subscribe('my-topic', 'my-subscription')
-
-    while True:
-        msg = consumer.receive()
-        try:
-            print("Received message '{}' id='{}'".format(msg.data(), 
msg.message_id()))
-            consumer.acknowledge(msg)
-        except Exception:
-            consumer.negative_acknowledge(msg)
-
-    client.close()
-
-### [Async producer](#pulsar.Producer.send_async) example
-
-    #!python
-    import pulsar
-
-    client = pulsar.Client('pulsar://localhost:6650')
-
-    producer = client.create_producer(
-                    'my-topic',
-                    block_if_queue_full=True,
-                    batching_enabled=True,
-                    batching_max_publish_delay_ms=10
-                )
-
-    def send_callback(res, msg_id):
-        print('Message published res=%s', res)
-
-    while True:
-        producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback)
-
-    client.close()
+Read the instructions on `source code repository
+<https://github.com/apache/pulsar-client-python#install-the-python-wheel>`_.
 """
 
 import logging
@@ -122,16 +64,21 @@ from datetime import timedelta
 
 class MessageId:
     """
-    Represents a message id
+    Represents a message id.
+
+    Attributes
+    ----------
+
+    earliest:
+        Represents the earliest message stored in a topic
+    latest:
+        Represents the latest message published on a topic
     """
 
     def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, 
batch_index=-1):
         self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, 
batch_index)
 
-    'Represents the earliest message stored in a topic'
     earliest = _pulsar.MessageId.earliest
-
-    'Represents the latest message published on a topic'
     latest = _pulsar.MessageId.latest
 
     def ledger_id(self):
@@ -149,7 +96,7 @@ class MessageId:
     def serialize(self):
         """
         Returns a bytes representation of the message id.
-        This bytes sequence can be stored and later deserialized.
+        This byte sequence can be stored and later deserialized.
         """
         return self._msg_id.serialize()
 
@@ -208,7 +155,7 @@ class Message:
 
     def message_id(self):
         """
-        The message ID that can be used to refere to this particular message.
+        The message ID that can be used to refer to this particular message.
         """
         return self._message.message_id()
 
@@ -266,12 +213,13 @@ class Authentication:
         """
         Create the authentication provider instance.
 
-        **Args**
+        Parameters
+        ----------
 
-        * `dynamicLibPath`: Path to the authentication provider shared library
-          (such as `tls.so`)
-        * `authParamsString`: Comma-separated list of provider-specific
-          configuration params
+        dynamicLibPath: str
+            Path to the authentication provider shared library (such as 
``tls.so``)
+        authParamsString: str
+            Comma-separated list of provider-specific configuration params
         """
         _check_type(str, dynamicLibPath, 'dynamicLibPath')
         _check_type(str, authParamsString, 'authParamsString')
@@ -286,10 +234,13 @@ class AuthenticationTLS(Authentication):
         """
         Create the TLS authentication provider instance.
 
-        **Args**
+        Parameters
+        ----------
 
-        * `certificatePath`: Path to the public certificate
-        * `privateKeyPath`: Path to private TLS key
+        certificate_path: str
+            Path to the public certificate
+        private_key_path: str
+            Path to private TLS key
         """
         _check_type(str, certificate_path, 'certificate_path')
         _check_type(str, private_key_path, 'private_key_path')
@@ -304,10 +255,11 @@ class AuthenticationToken(Authentication):
         """
         Create the token authentication provider instance.
 
-        **Args**
+        Parameters
+        ----------
 
-        * `token`: A string containing the token or a functions that provides a
-                   string with the token
+        token
+            A string containing the token or a functions that provides a 
string with the token
         """
         if not (isinstance(token, str) or callable(token)):
             raise ValueError("Argument token is expected to be of type 'str' 
or a function returning 'str'")
@@ -322,9 +274,11 @@ class AuthenticationAthenz(Authentication):
         """
         Create the Athenz authentication provider instance.
 
-        **Args**
+        Parameters
+        ----------
 
-        * `auth_params_string`: JSON encoded configuration for Athenz client
+        auth_params_string: str
+            JSON encoded configuration for Athenz client
         """
         _check_type(str, auth_params_string, 'auth_params_string')
         self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
@@ -337,9 +291,11 @@ class AuthenticationOauth2(Authentication):
         """
         Create the Oauth2 authentication provider instance.
 
-        **Args**
+        Parameters
+        ----------
 
-        * `auth_params_string`: JSON encoded configuration for Oauth2 client
+        auth_params_string: str
+            JSON encoded configuration for Oauth2 client
         """
         _check_type(str, auth_params_string, 'auth_params_string')
         self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
@@ -355,23 +311,26 @@ class AuthenticationBasic(Authentication):
         For example, if you want to create a basic authentication instance 
whose
         username is "my-user" and password is "my-pass", there are two ways:
 
-        ```
-        auth = AuthenticationBasic('my-user', 'my-pass')
-        auth = AuthenticationBasic(auth_params_string='{"username": "my-user", 
"password": "my-pass"}')
-        ```
-
-        **Args**
-        * username : str, optional
-        * password : str, optional
-        * method : str, optional
-            The authentication method name (default is 'basic')
-        * auth_params_string : str, optional
-            The JSON presentation of all fields above (default is None)
-            If it's not None, the other parameters will be ignored.
+        .. code-block:: python
+
+            auth = AuthenticationBasic('my-user', 'my-pass')
+            auth = AuthenticationBasic(auth_params_string='{"username": 
"my-user", "password": "my-pass"}')
+
+
+        Parameters
+        ----------
+        username : str, optional
+        password : str, optional
+        method : str, default='basic'
+            The authentication method name
+        auth_params_string : str, optional
+            The JSON presentation of all fields above. If it's not None, the 
other parameters will be ignored.
             Here is an example JSON presentation:
-              {"username": "my-user", "password": "my-pass", "method": 
"oms3.0"}
-            The `username` and `password` fields are required. If the "method" 
field is not set,
-            it will be "basic" by default.
+
+                {"username": "my-user", "password": "my-pass", "method": 
"oms3.0"}
+
+            The ``username`` and ``password`` fields are required. If the 
"method" field is not set, it will be
+            "basic" by default.
         """
         if auth_params_string is not None:
             _check_type(str, auth_params_string, 'auth_params_string')
@@ -409,53 +368,50 @@ class Client:
         """
         Create a new Pulsar client instance.
 
-        **Args**
-
-        * `service_url`: The Pulsar service url eg: 
pulsar://my-broker.com:6650/
-
-        **Options**
-
-        * `authentication`:
-          Set the authentication provider to be used with the broker. For 
example:
-          `AuthenticationTls`, `AuthenticationToken`, `AuthenticationAthenz` 
or `AuthenticationOauth2`
-        * `operation_timeout_seconds`:
-          Set timeout on client operations (subscribe, create producer, close,
-          unsubscribe).
-        * `io_threads`:
-          Set the number of IO threads to be used by the Pulsar client.
-        * `message_listener_threads`:
-          Set the number of threads to be used by the Pulsar client when
-          delivering messages through message listener. The default is 1 thread
-          per Pulsar client. If using more than 1 thread, messages for distinct
-          `message_listener`s will be delivered in different threads, however a
-          single `MessageListener` will always be assigned to the same thread.
-        * `concurrent_lookup_requests`:
-          Number of concurrent lookup-requests allowed on each broker 
connection
-          to prevent overload on the broker.
-        * `log_conf_file_path`:
-          Initialize log4cxx from a configuration file.
-        * `use_tls`:
-          Configure whether to use TLS encryption on the connection. This 
setting
-          is deprecated. TLS will be automatically enabled if the `serviceUrl` 
is
-          set to `pulsar+ssl://` or `https://`
-        * `tls_trust_certs_file_path`:
-          Set the path to the trusted TLS certificate file. If empty defaults 
to
-          certifi.
-        * `tls_allow_insecure_connection`:
-          Configure whether the Pulsar client accepts untrusted TLS 
certificates
-          from the broker.
-        * `tls_validate_hostname`:
-          Configure whether the Pulsar client validates that the hostname of 
the
-          endpoint, matches the common name on the TLS certificate presented by
-          the endpoint.
-        * `logger`:
-          Set a Python logger for this Pulsar client. Should be an instance of 
`logging.Logger`.
-        * `connection_timeout_ms`:
-          Set timeout in milliseconds on TCP connections.
-        * `listener_name`:
-          Listener name for lookup. Clients can use listenerName to choose one 
of the listeners
-          as the service URL to create a connection to the broker as long as 
the network is accessible.
-          advertisedListeners must enabled in broker side.
+        Parameters
+        ----------
+
+        service_url: str
+            The Pulsar service url eg: pulsar://my-broker.com:6650/
+        authentication: Authentication, optional
+            Set the authentication provider to be used with the broker. 
Supported methods:
+
+            * `AuthenticationTLS`
+            * `AuthenticationToken`
+            * `AuthenticationAthenz`
+            * `AuthenticationOauth2`
+        operation_timeout_seconds: int, default=30
+            Set timeout on client operations (subscribe, create producer, 
close, unsubscribe).
+        io_threads: int, default=1
+            Set the number of IO threads to be used by the Pulsar client.
+        message_listener_threads: int, default=1
+            Set the number of threads to be used by the Pulsar client when 
delivering messages through
+            message listener. The default is 1 thread per Pulsar client. If 
using more than 1 thread,
+            messages for distinct ``message_listener``s will be delivered in 
different threads, however a
+            single ``MessageListener`` will always be assigned to the same 
thread.
+        concurrent_lookup_requests: int, default=50000
+            Number of concurrent lookup-requests allowed on each broker 
connection to prevent overload
+            on the broker.
+        log_conf_file_path: str, optional
+            Initialize log4cxx from a configuration file.
+        use_tls: bool, default=False
+            Configure whether to use TLS encryption on the connection. This 
setting is deprecated.
+            TLS will be automatically enabled if the ``serviceUrl`` is set to 
``pulsar+ssl://`` or ``https://``
+        tls_trust_certs_file_path: str, optional
+            Set the path to the trusted TLS certificate file. If empty 
defaults to certifi.
+        tls_allow_insecure_connection: bool, default=False
+            Configure whether the Pulsar client accepts untrusted TLS 
certificates from the broker.
+        tls_validate_hostname: bool, default=False
+            Configure whether the Pulsar client validates that the hostname of 
the endpoint,
+            matches the common name on the TLS certificate presented by the 
endpoint.
+        logger: optional
+            Set a Python logger for this Pulsar client. Should be an instance 
of `logging.Logger`.
+        connection_timeout_ms: int, default=10000
+            Set timeout in milliseconds on TCP connections.
+        listener_name: str, optional
+            Listener name for lookup. Clients can use listenerName to choose 
one of the listeners as
+            the service URL to create a connection to the broker as long as 
the network is accessible.
+            ``advertisedListeners`` must be enabled in broker side.
         """
         _check_type(str, service_url, 'service_url')
         _check_type_or_none(Authentication, authentication, 'authentication')
@@ -539,89 +495,92 @@ class Client:
         """
         Create a new producer on a given topic.
 
-        **Args**
-
-        * `topic`:
-          The topic name
-
-        **Options**
-
-        * `producer_name`:
-           Specify a name for the producer. If not assigned,
-           the system will generate a globally unique name which can be 
accessed
-           with `Producer.producer_name()`. When specifying a name, it is app 
to
-           the user to ensure that, for a given topic, the producer name is 
unique
-           across all Pulsar's clusters.
-        * `schema`:
-           Define the schema of the data that will be published by this 
producer.
-           The schema will be used for two purposes:
-             - Validate the data format against the topic defined schema
-             - Perform serialization/deserialization between data and objects
-           An example for this parameter would be to pass 
`schema=JsonSchema(MyRecordClass)`.
-        * `initial_sequence_id`:
-           Set the baseline for the sequence ids for messages
-           published by the producer. First message will be using
-           `(initialSequenceId + 1)`` as its sequence id and subsequent 
messages will
-           be assigned incremental sequence ids, if not otherwise specified.
-        * `send_timeout_millis`:
-          If a message is not acknowledged by the server before the
-          `send_timeout` expires, an error will be reported.
-        * `compression_type`:
-          Set the compression type for the producer. By default, message
-          payloads are not compressed. Supported compression types are
-          `CompressionType.LZ4`, `CompressionType.ZLib`, 
`CompressionType.ZSTD` and `CompressionType.SNAPPY`.
-          ZSTD is supported since Pulsar 2.3. Consumers will need to be at 
least at that
-          release in order to be able to receive messages compressed with ZSTD.
-          SNAPPY is supported since Pulsar 2.4. Consumers will need to be at 
least at that
-          release in order to be able to receive messages compressed with 
SNAPPY.
-        * `max_pending_messages`:
-          Set the max size of the queue holding the messages pending to receive
-          an acknowledgment from the broker.
-        * `max_pending_messages_across_partitions`:
-          Set the max size of the queue holding the messages pending to receive
-          an acknowledgment across partitions from the broker.
-        * `block_if_queue_full`: Set whether `send_async` operations should
-          block when the outgoing message queue is full.
-        * `message_routing_mode`:
-          Set the message routing mode for the partitioned producer. Default 
is `PartitionsRoutingMode.RoundRobinDistribution`,
-          other option is `PartitionsRoutingMode.UseSinglePartition`
-        * `lazy_start_partitioned_producers`:
-          This config affects producers of partitioned topics only. It 
controls whether
-          producers register and connect immediately to the owner broker of 
each partition
-          or start lazily on demand. The internal producer of one partition is 
always
-          started eagerly, chosen by the routing policy, but the internal 
producers of
-          any additional partitions are started on demand, upon receiving 
their first
-          message.
-          Using this mode can reduce the strain on brokers for topics with 
large numbers of
-          partitions and when the SinglePartition routing policy is used 
without keyed messages.
-          Because producer connection can be on demand, this can produce extra 
send latency
-          for the first messages of a given partition.
-        * `properties`:
-          Sets the properties for the producer. The properties associated with 
a producer
-          can be used for identify a producer at broker side.
-        * `batching_type`:
-          Sets the batching type for the producer.
-          There are two batching type: DefaultBatching and KeyBasedBatching.
-            - Default batching
-            incoming single messages:
-            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, 
v3), (k2, v3), (k3, v3)
-            batched into single batch message:
-            [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, 
v3), (k2, v3), (k3, v3)]
-
-            - KeyBasedBatching
-            incoming single messages:
-            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, 
v3), (k2, v3), (k3, v3)
-            batched into single batch message:
-            [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], 
[(k3, v1), (k3, v2), (k3, v3)]
-        * `chunking_enabled`:
-          If message size is higher than allowed max publish-payload size by 
broker then chunking_enabled
-          helps producer to split message into multiple chunks and publish 
them to broker separately and in
-          order. So, it allows client to successfully publish large size of 
messages in pulsar.
-        * encryption_key:
-           The key used for symmetric encryption, configured on the producer 
side
-        * crypto_key_reader:
-           Symmetric encryption class implementation, configuring public key 
encryption messages for the producer
-           and private key decryption messages for the consumer
+        Parameters
+        ----------
+
+        topic: str
+            The topic name
+        producer_name: str, optional
+            Specify a name for the producer. If not assigned, the system will 
generate a globally unique name
+            which can be accessed with `Producer.producer_name()`. When 
specifying a name, it is app to the user
+            to ensure that, for a given topic, the producer name is unique 
across all Pulsar's clusters.
+        schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
+            Define the schema of the data that will be published by this 
producer, e.g,
+            ``schema=JsonSchema(MyRecordClass)``.
+
+            The schema will be used for two purposes:
+                * Validate the data format against the topic defined schema
+                * Perform serialization/deserialization between data and 
objects
+        initial_sequence_id: int, optional
+            Set the baseline for the sequence ids for messages published by 
the producer. First message will be
+            using ``(initialSequenceId + 1)`` as its sequence id and 
subsequent messages will be assigned
+            incremental sequence ids, if not otherwise specified.
+        send_timeout_millis: int, default=30000
+            If a message is not acknowledged by the server before the 
``send_timeout`` expires, an error will be reported.
+        compression_type: CompressionType, default=CompressionType.NONE
+            Set the compression type for the producer. By default, message 
payloads are not compressed.
+
+            Supported compression types:
+
+            * CompressionType.LZ4
+            * CompressionType.ZLib
+            * CompressionType.ZSTD
+            * CompressionType.SNAPPY
+
+            ZSTD is supported since Pulsar 2.3. Consumers will need to be at 
least at that release in order to
+            be able to receive messages compressed with ZSTD.
+
+            SNAPPY is supported since Pulsar 2.4. Consumers will need to be at 
least at that release in order to
+            be able to receive messages compressed with SNAPPY.
+        max_pending_messages: int, default=1000
+            Set the max size of the queue holding the messages pending to 
receive an acknowledgment from the broker.
+        max_pending_messages_across_partitions: int, default=50000
+            Set the max size of the queue holding the messages pending to 
receive an acknowledgment across partitions
+            from the broker.
+        block_if_queue_full: bool, default=False
+            Set whether `send_async` operations should block when the outgoing 
message queue is full.
+        message_routing_mode: PartitionsRoutingMode, 
default=PartitionsRoutingMode.RoundRobinDistribution
+            Set the message routing mode for the partitioned producer.
+
+            Supported modes:
+
+            * `PartitionsRoutingMode.RoundRobinDistribution`
+            * `PartitionsRoutingMode.UseSinglePartition`.
+        lazy_start_partitioned_producers: bool, default=False
+            This config affects producers of partitioned topics only. It 
controls whether producers register
+            and connect immediately to the owner broker of each partition or 
start lazily on demand. The internal
+            producer of one partition is always started eagerly, chosen by the 
routing policy, but the internal
+            producers of any additional partitions are started on demand, upon 
receiving their first message.
+
+            Using this mode can reduce the strain on brokers for topics with 
large numbers of partitions and when
+            the SinglePartition routing policy is used without keyed messages. 
Because producer connection can be
+            on demand, this can produce extra send latency for the first 
messages of a given partition.
+        properties: dict, optional
+            Sets the properties for the producer. The properties associated 
with a producer can be used for identify
+            a producer at broker side.
+        batching_type: BatchingType, default=BatchingType.Default
+            Sets the batching type for the producer.
+
+            There are two batching type: DefaultBatching and KeyBasedBatching.
+
+            DefaultBatching will batch single messages:
+                (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), 
(k1, v3), (k2, v3), (k3, v3)
+            ... into single batch message:
+                [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), 
(k1, v3), (k2, v3), (k3, v3)]
+
+            KeyBasedBatching will batch incoming single messages:
+                (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), 
(k1, v3), (k2, v3), (k3, v3)
+            ... into single batch message:
+                [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, 
v3)], [(k3, v1), (k3, v2), (k3, v3)]
+        chunking_enabled: bool, default=False
+            If message size is higher than allowed max publish-payload size by 
broker then chunking_enabled helps
+            producer to split message into multiple chunks and publish them to 
broker separately and in order.
+            So, it allows client to successfully publish large size of 
messages in pulsar.
+        encryption_key: str, optional
+            The key used for symmetric encryption, configured on the producer 
side
+        crypto_key_reader: CryptoKeyReader, optional
+            Symmetric encryption class implementation, configuring public key 
encryption messages for the producer
+            and private key decryption messages for the consumer
         """
         _check_type(str, topic, 'topic')
         _check_type_or_none(str, producer_name, 'producer_name')
@@ -703,100 +662,84 @@ class Client:
         """
         Subscribe to the given topic and subscription combination.
 
-        **Args**
+        Parameters
+        ----------
+
+        topic:
+            The name of the topic, list of topics or regex pattern. This 
method will accept these forms:
+            * ``topic='my-topic'``
+            * ``topic=['topic-1', 'topic-2', 'topic-3']``
+            * ``topic=re.compile('persistent://public/default/topic-*')``
+        subscription_name: str
+            The name of the subscription.
+        consumer_type: ConsumerType, default=ConsumerType.Exclusive
+            Select the subscription type to be used when subscribing to the 
topic.
+        schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
+            Define the schema of the data that will be received by this 
consumer.
+        message_listener: optional
+            Sets a message listener for the consumer. When the listener is 
set, the application will
+            receive messages through it. Calls to ``consumer.receive()`` will 
not be allowed.
+            The listener function needs to accept (consumer, message), for 
example:
+
+            .. code-block:: python
 
-        * `topic`: The name of the topic, list of topics or regex pattern.
-                  This method will accept these forms:
-                    - `topic='my-topic'`
-                    - `topic=['topic-1', 'topic-2', 'topic-3']`
-                    - `topic=re.compile('persistent://public/default/topic-*')`
-        * `subscription`: The name of the subscription.
-
-        **Options**
-
-        * `consumer_type`:
-          Select the subscription type to be used when subscribing to the 
topic.
-        * `schema`:
-           Define the schema of the data that will be received by this 
consumer.
-        * `message_listener`:
-          Sets a message listener for the consumer. When the listener is set,
-          the application will receive messages through it. Calls to
-          `consumer.receive()` will not be allowed. The listener function needs
-          to accept (consumer, message), for example:
-
-                #!python
                 def my_listener(consumer, message):
                     # process message
                     consumer.acknowledge(message)
-
-        * `receiver_queue_size`:
-          Sets the size of the consumer receive queue. The consumer receive
-          queue controls how many messages can be accumulated by the consumer
-          before the application calls `receive()`. Using a higher value could
-          potentially increase the consumer throughput at the expense of higher
-          memory utilization. Setting the consumer queue size to zero decreases
-          the throughput of the consumer by disabling pre-fetching of messages.
-          This approach improves the message distribution on shared 
subscription
-          by pushing messages only to those consumers that are ready to process
-          them. Neither receive with timeout nor partitioned topics can be used
-          if the consumer queue size is zero. The `receive()` function call
-          should not be interrupted when the consumer queue size is zero. The
-          default value is 1000 messages and should work well for most use
-          cases.
-        * `max_total_receiver_queue_size_across_partitions`
-          Set the max total receiver queue size across partitions.
-          This setting will be used to reduce the receiver queue size for 
individual partitions
-        * `consumer_name`:
-          Sets the consumer name.
-        * `unacked_messages_timeout_ms`:
-          Sets the timeout in milliseconds for unacknowledged messages. The
-          timeout needs to be greater than 10 seconds. An exception is thrown 
if
-          the given value is less than 10 seconds. If a successful
-          acknowledgement is not sent within the timeout, all the 
unacknowledged
-          messages are redelivered.
-        * `negative_ack_redelivery_delay_ms`:
-           The delay after which to redeliver the messages that failed to be
-           processed (with the `consumer.negative_acknowledge()`)
-        * `broker_consumer_stats_cache_time_ms`:
-          Sets the time duration for which the broker-side consumer stats will
-          be cached in the client.
-        * `is_read_compacted`:
-          Selects whether to read the compacted version of the topic
-        * `properties`:
-          Sets the properties for the consumer. The properties associated with 
a consumer
-          can be used for identify a consumer at broker side.
-        * `pattern_auto_discovery_period`:
-          Periods of seconds for consumer to auto discover match topics.
-        * `initial_position`:
-          Set the initial position of a consumer  when subscribing to the 
topic.
+        receiver_queue_size: int, default=1000
+            Sets the size of the consumer receive queue. The consumer receive 
queue controls how many messages can be
+            accumulated by the consumer before the application calls 
`receive()`. Using a higher value could potentially
+            increase the consumer throughput at the expense of higher memory 
utilization. Setting the consumer queue
+            size to zero decreases the throughput of the consumer by disabling 
pre-fetching of messages.
+
+            This approach improves the message distribution on shared 
subscription by pushing messages only to those
+            consumers that are ready to process them. Neither receive with 
timeout nor partitioned topics can be used
+            if the consumer queue size is zero. The `receive()` function call 
should not be interrupted when the
+            consumer queue size is zero. The default value is 1000 messages 
and should work well for most use cases.
+        max_total_receiver_queue_size_across_partitions: int, default=50000
+            Set the max total receiver queue size across partitions. This 
setting will be used to reduce the
+            receiver queue size for individual partitions
+        consumer_name: str, optional
+            Sets the consumer name.
+        unacked_messages_timeout_ms: int, optional
+            Sets the timeout in milliseconds for unacknowledged messages. The 
timeout needs to be greater than
+            10 seconds. An exception is thrown if the given value is less than 
10 seconds. If a successful
+            acknowledgement is not sent within the timeout, all the 
unacknowledged messages are redelivered.
+        negative_ack_redelivery_delay_ms: int, default=60000
+            The delay after which to redeliver the messages that failed to be 
processed
+            (with the ``consumer.negative_acknowledge()``)
+        broker_consumer_stats_cache_time_ms: int, default=30000
+            Sets the time duration for which the broker-side consumer stats 
will be cached in the client.
+        is_read_compacted: bool, default=False
+            Selects whether to read the compacted version of the topic
+        properties: dict, optional
+            Sets the properties for the consumer. The properties associated 
with a consumer can be used for
+            identify a consumer at broker side.
+        pattern_auto_discovery_period: int, default=60
+            Periods of seconds for consumer to auto discover match topics.
+        initial_position: InitialPosition, default=InitialPosition.Latest
+          Set the initial position of a consumer when subscribing to the topic.
           It could be either: `InitialPosition.Earliest` or 
`InitialPosition.Latest`.
-          Default: `Latest`.
-        * crypto_key_reader:
-           Symmetric encryption class implementation, configuring public key 
encryption messages for the producer
-           and private key decryption messages for the consumer
-        * replicate_subscription_state_enabled:
-          Set whether the subscription status should be replicated.
-          Default: `False`.
-        * max_pending_chunked_message:
+        crypto_key_reader: CryptoKeyReader, optional
+            Symmetric encryption class implementation, configuring public key 
encryption messages for the producer
+            and private key decryption messages for the consumer
+        replicate_subscription_state_enabled: bool, default=False
+            Set whether the subscription status should be replicated.
+        max_pending_chunked_message: int, default=10
           Consumer buffers chunk messages into memory until it receives all 
the chunks of the original message.
-          While consuming chunk-messages, chunks from same message might not 
be contiguous in the stream and they
+          While consuming chunk-messages, chunks from same message might not 
be contiguous in the stream, and they
           might be mixed with other messages' chunks. so, consumer has to 
maintain multiple buffers to manage
           chunks coming from different messages. This mainly happens when 
multiple publishers are publishing
           messages on the topic concurrently or publisher failed to publish 
all chunks of the messages.
 
           If it's zero, the pending chunked messages will not be limited.
-
-          Default: `10`.
-        * auto_ack_oldest_chunked_message_on_queue_full:
-          Buffering large number of outstanding uncompleted chunked messages 
can create memory pressure and it
+        auto_ack_oldest_chunked_message_on_queue_full: bool, default=False
+          Buffering large number of outstanding uncompleted chunked messages 
can create memory pressure, and it
           can be guarded by providing the maxPendingChunkedMessage threshold. 
See setMaxPendingChunkedMessage.
           Once, consumer reaches this threshold, it drops the outstanding 
unchunked-messages by silently acking
           if autoAckOldestChunkedMessageOnQueueFull is true else it marks them 
for redelivery.
-          Default: `False`.
-        * start_message_id_inclusive:
+        start_message_id_inclusive: bool, default=False
           Set the consumer to include the given position of any reset 
operation like Consumer::seek.
-
-          Default: `False`.
         """
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -876,54 +819,60 @@ class Client:
         """
         Create a reader on a particular topic
 
-        **Args**
+        Parameters
+        ----------
+
+        topic:
+            The name of the topic.
+        start_message_id:
+            The initial reader positioning is done by specifying a message id. 
The options are:
+
+            * ``MessageId.earliest``:
+
+            Start reading from the earliest message available in the topic
+
+            * ``MessageId.latest``:
+
+            Start reading from the end topic, only getting messages published 
after the reader was created
+
+            * ``MessageId``:
 
-        * `topic`: The name of the topic.
-        * `start_message_id`: The initial reader positioning is done by 
specifying a message id.
-           The options are:
-            * `MessageId.earliest`: Start reading from the earliest message 
available in the topic
-            * `MessageId.latest`: Start reading from the end topic, only 
getting messages published
-               after the reader was created
-            * `MessageId`: When passing a particular message id, the reader 
will position itself on
-               that specific position. The first message to be read will be 
the message next to the
-               specified messageId. Message id can be serialized into a string 
and deserialized
-               back into a `MessageId` object:
+            When passing a particular message id, the reader will position 
itself on that specific position.
+            The first message to be read will be the message next to the 
specified messageId.
+            Message id can be serialized into a string and deserialized back 
into a `MessageId` object:
+
+               .. code-block:: python
 
                    # Serialize to string
                    s = msg.message_id().serialize()
 
                    # Deserialize from string
                    msg_id = MessageId.deserialize(s)
+        schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
+            Define the schema of the data that will be received by this reader.
+        reader_listener: optional
+            Sets a message listener for the reader. When the listener is set, 
the application will receive messages
+            through it. Calls to ``reader.read_next()`` will not be allowed. 
The listener function needs to accept
+            (reader, message), for example:
 
-        **Options**
-
-        * `schema`:
-           Define the schema of the data that will be received by this reader.
-        * `reader_listener`:
-          Sets a message listener for the reader. When the listener is set,
-          the application will receive messages through it. Calls to
-          `reader.read_next()` will not be allowed. The listener function needs
-          to accept (reader, message), for example:
+            .. code-block:: python
 
                 def my_listener(reader, message):
                     # process message
                     pass
-
-        * `receiver_queue_size`:
-          Sets the size of the reader receive queue. The reader receive
-          queue controls how many messages can be accumulated by the reader
-          before the application calls `read_next()`. Using a higher value 
could
-          potentially increase the reader throughput at the expense of higher
-          memory utilization.
-        * `reader_name`:
-          Sets the reader name.
-        * `subscription_role_prefix`:
-          Sets the subscription role prefix.
-        * `is_read_compacted`:
-          Selects whether to read the compacted version of the topic
-        * crypto_key_reader:
-           Symmetric encryption class implementation, configuring public key 
encryption messages for the producer
-           and private key decryption messages for the consumer
+        receiver_queue_size: int, default=1000
+            Sets the size of the reader receive queue. The reader receive 
queue controls how many messages can be
+            accumulated by the reader before the application calls 
`read_next()`. Using a higher value could
+            potentially increase the reader throughput at the expense of 
higher memory utilization.
+        reader_name: str, optional
+            Sets the reader name.
+        subscription_role_prefix: str, optional
+            Sets the subscription role prefix.
+        is_read_compacted: bool, default=False
+            Selects whether to read the compacted version of the topic
+        crypto_key_reader: CryptoKeyReader, optional
+            Symmetric encryption class implementation, configuring public key 
encryption messages for the producer
+            and private key decryption messages for the consumer
         """
         _check_type(str, topic, 'topic')
         _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
@@ -963,8 +912,17 @@ class Client:
 
         This can be used to discover the partitions and create Reader, 
Consumer or Producer
         instances directly on a particular partition.
-        :param topic: the topic name to lookup
-        :return: a list of partition name
+
+        Parameters
+        ----------
+
+        topic: str
+            the topic name to lookup
+
+        Returns
+        -------
+        list
+            a list of partition name
         """
         _check_type(str, topic, 'topic')
         return self._client.get_topic_partitions(topic)
@@ -988,6 +946,19 @@ class Client:
 class Producer:
     """
     The Pulsar message producer, used to publish messages on a topic.
+
+    Examples
+    --------
+
+    .. code-block:: python
+
+        import pulsar
+
+        client = pulsar.Client('pulsar://localhost:6650')
+        producer = client.create_producer('my-topic')
+        for i in range(10):
+            producer.send(('Hello-%d' % i).encode('utf-8'))
+        client.close()
     """
 
     def topic(self):
@@ -1007,12 +978,12 @@ class Producer:
         """
         Get the last sequence id that was published by this producer.
 
-        This represent either the automatically assigned or custom sequence id
-        (set on the `MessageBuilder`) that was published and acknowledged by 
the broker.
+        This represents either the automatically assigned or custom sequence id
+        (set on the ``MessageBuilder``) that was published and acknowledged by 
the broker.
 
         After recreating a producer with the same producer name, this will 
return the
         last message that was published in the previous producer session, or 
-1 if
-        there no message was ever published.
+        there was no message ever published.
         """
         return self._producer.last_sequence_id()
 
@@ -1031,36 +1002,31 @@ class Producer:
 
         Returns a `MessageId` object that represents where the message is 
persisted.
 
-        **Args**
-
-        * `content`:
-          A `bytes` object with the message payload.
-
-        **Options**
-
-        * `properties`:
-          A dict of application-defined string properties.
-        * `partition_key`:
-          Sets the partition key for message routing. A hash of this key is 
used
-          to determine the message's topic partition.
-        * `sequence_id`:
-          Specify a custom sequence id for the message being published.
-        * `replication_clusters`:
-          Override namespace replication clusters. Note that it is the caller's
-          responsibility to provide valid cluster names and that all clusters
-          have been previously configured as topics. Given an empty list,
+        Parameters
+        ----------
+
+        content:
+            A ``bytes`` object with the message payload.
+        properties: optional
+            A dict of application-defined string properties.
+        partition_key: optional
+            Sets the partition key for message routing. A hash of this key is 
used
+            to determine the message's topic partition.
+        sequence_id:  optional
+            Specify a custom sequence id for the message being published.
+        replication_clusters:  optional
+          Override namespace replication clusters. Note that it is the 
caller's responsibility to provide valid
+          cluster names and that all clusters have been previously configured 
as topics. Given an empty list,
           the message will replicate according to the namespace configuration.
-        * `disable_replication`:
-          Do not replicate this message.
-        * `event_timestamp`:
-          Timestamp in millis of the timestamp of event creation
-        * `deliver_at`:
-          Specify the this message should not be delivered earlier than the
-          specified timestamp.
-          The timestamp is milliseconds and based on UTC
-        * `deliver_after`:
-          Specify a delay in timedelta for the delivery of the messages.
-
+        disable_replication: bool, default=False
+            Do not replicate this message.
+        event_timestamp: optional
+            Timestamp in millis of the timestamp of event creation
+        deliver_at: optional
+            Specify the message should not be delivered earlier than the 
specified timestamp.
+            The timestamp is milliseconds and based on UTC
+        deliver_after: optional
+            Specify a delay in timedelta for the delivery of the messages.
         """
         msg = self._build_msg(content, properties, partition_key, sequence_id,
                               replication_clusters, disable_replication, 
event_timestamp,
@@ -1080,49 +1046,61 @@ class Producer:
         """
         Send a message asynchronously.
 
-        The `callback` will be invoked once the message has been acknowledged
-        by the broker.
+        Examples
+        --------
+
+        The ``callback`` will be invoked once the message has been 
acknowledged by the broker.
+
+        .. code-block:: python
 
-        Example:
+            import pulsar
+
+            client = pulsar.Client('pulsar://localhost:6650')
+            producer = client.create_producer(
+                            'my-topic',
+                            block_if_queue_full=True,
+                            batching_enabled=True,
+                            batching_max_publish_delay_ms=10)
 
-            #!python
             def callback(res, msg_id):
-                print('Message published: %s' % res)
+                print('Message published res=%s', res)
+
+            while True:
+                producer.send_async(('Hello-%d' % i).encode('utf-8'), callback)
+
+            client.close()
 
-            producer.send_async(msg, callback)
 
         When the producer queue is full, by default the message will be 
rejected
         and the callback invoked with an error code.
 
-        **Args**
-
-        * `content`:
-          A `bytes` object with the message payload.
-
-        **Options**
-
-        * `properties`:
-          A dict of application0-defined string properties.
-        * `partition_key`:
-          Sets the partition key for the message routing. A hash of this key is
-          used to determine the message's topic partition.
-        * `sequence_id`:
-          Specify a custom sequence id for the message being published.
-        * `replication_clusters`: Override namespace replication clusters. Note
-          that it is the caller's responsibility to provide valid cluster names
-          and that all clusters have been previously configured as topics.
-          Given an empty list, the message will replicate per the namespace
-          configuration.
-        * `disable_replication`:
-          Do not replicate this message.
-        * `event_timestamp`:
-          Timestamp in millis of the timestamp of event creation
-        * `deliver_at`:
-          Specify the this message should not be delivered earlier than the
-          specified timestamp.
-          The timestamp is milliseconds and based on UTC
-        * `deliver_after`:
-          Specify a delay in timedelta for the delivery of the messages.
+
+        Parameters
+        ----------
+
+        content
+            A `bytes` object with the message payload.
+        callback
+            A callback that is invoked once the message has been acknowledged 
by the broker.
+        properties: optional
+            A dict of application0-defined string properties.
+        partition_key: optional
+            Sets the partition key for the message routing. A hash of this key 
is
+            used to determine the message's topic partition.
+        sequence_id: optional
+            Specify a custom sequence id for the message being published.
+        replication_clusters: optional
+            Override namespace replication clusters. Note that it is the 
caller's responsibility
+            to provide valid cluster names and that all clusters have been 
previously configured
+            as topics. Given an empty list, the message will replicate per the 
namespace configuration.
+        disable_replication: optional
+            Do not replicate this message.
+        event_timestamp: optional
+            Timestamp in millis of the timestamp of event creation
+        deliver_at: optional
+            Specify the message should not be delivered earlier than the 
specified timestamp.
+        deliver_after: optional
+            Specify a delay in timedelta for the delivery of the messages.
         """
         msg = self._build_msg(content, properties, partition_key, sequence_id,
                               replication_clusters, disable_replication, 
event_timestamp,
@@ -1191,6 +1169,24 @@ class Producer:
 class Consumer:
     """
     Pulsar consumer.
+
+    Examples
+    --------
+
+    .. code-block:: python
+
+        import pulsar
+
+        client = pulsar.Client('pulsar://localhost:6650')
+        consumer = client.subscribe('my-topic', 'my-subscription')
+        while True:
+            msg = consumer.receive()
+            try:
+                print("Received message '{}' id='{}'".format(msg.data(), 
msg.message_id()))
+                consumer.acknowledge(msg)
+            except Exception:
+                consumer.negative_acknowledge(msg)
+        client.close()
     """
 
     def topic(self):
@@ -1224,11 +1220,11 @@ class Consumer:
         If a message is not immediately available, this method will block until
         a new message is available.
 
-        **Options**
+        Parameters
+        ----------
 
-        * `timeout_millis`:
-          If specified, the receive will raise an exception if a message is not
-          available within the timeout.
+        timeout_millis: int, optional
+            If specified, the receiver will raise an exception if a message is 
not available within the timeout.
         """
         if timeout_millis is None:
             msg = self._consumer.receive()
@@ -1248,10 +1244,11 @@ class Consumer:
         This method will block until an acknowledgement is sent to the broker.
         After that, the message will not be re-delivered to this consumer.
 
-        **Args**
+        Parameters
+        ----------
 
-        * `message`:
-          The received message or message id.
+        message:
+            The received message or message id.
         """
         if isinstance(message, Message):
             self._consumer.acknowledge(message._message)
@@ -1266,10 +1263,11 @@ class Consumer:
         This method will block until an acknowledgement is sent to the broker.
         After that, the messages will not be re-delivered to this consumer.
 
-        **Args**
+        Parameters
+        ----------
 
-        * `message`:
-          The received message or message id.
+        message:
+            The received message or message id.
         """
         if isinstance(message, Message):
             self._consumer.acknowledge_cumulative(message._message)
@@ -1286,10 +1284,11 @@ class Consumer:
 
         This call is not blocking.
 
-        **Args**
+        Parameters
+        ----------
 
-        * `message`:
-          The received message or message id.
+        message:
+            The received message or message id.
         """
         if isinstance(message, Message):
             self._consumer.negative_acknowledge(message._message)
@@ -1298,8 +1297,7 @@ class Consumer:
 
     def pause_message_listener(self):
         """
-        Pause receiving messages via the `message_listener` until
-        `resume_message_listener()` is called.
+        Pause receiving messages via the ``message_listener`` until 
`resume_message_listener()` is called.
         """
         self._consumer.pause_message_listener()
 
@@ -1329,10 +1327,11 @@ class Consumer:
         Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the
         seek() on the individual partitions.
 
-        **Args**
+        Parameters
+        ----------
 
-        * `message`:
-          The message id for seek, OR an integer event time to seek to
+        messageid:
+            The message id for seek, OR an integer event time to seek to
         """
         self._consumer.seek(messageid)
 
@@ -1374,11 +1373,11 @@ class Reader:
         If a message is not immediately available, this method will block until
         a new message is available.
 
-        **Options**
+        Parameters
+        ----------
 
-        * `timeout_millis`:
-          If specified, the receive will raise an exception if a message is not
-          available within the timeout.
+        timeout_millis: int, optional
+            If specified, the receiver will raise an exception if a message is 
not available within the timeout.
         """
         if timeout_millis is None:
             msg = self._reader.read_next()
@@ -1404,10 +1403,11 @@ class Reader:
         Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the
         seek() on the individual partitions.
 
-        **Args**
+        Parameters
+        ----------
 
-        * `message`:
-          The message id for seek, OR an integer event time to seek to
+        messageid:
+            The message id for seek, OR an integer event time to seek to
         """
         self._reader.seek(messageid)
 
@@ -1433,10 +1433,13 @@ class CryptoKeyReader:
         """
         Create crypto key reader.
 
-        **Args**
+        Parameters
+        ----------
 
-        * `public_key_path`: Path to the public key
-        * `private_key_path`: Path to private key
+        public_key_path: str
+            Path to the public key
+        private_key_path: str
+            Path to private key
         """
         _check_type(str, public_key_path, 'public_key_path')
         _check_type(str, private_key_path, 'private_key_path')
@@ -1447,9 +1450,11 @@ class ConsoleLogger:
     """
     Logger that writes on standard output
 
-        **Args**
+    Attributes
+    ----------
 
-        * `log_level`: The logging level. eg: `pulsar.LoggerLevel.Info`
+    log_level:
+        The logging level, eg: ``pulsar.LoggerLevel.Info``
     """
     def __init__(self, log_level=_pulsar.LoggerLevel.Info):
         _check_type(_pulsar.LoggerLevel, log_level, 'log_level')
@@ -1460,10 +1465,13 @@ class FileLogger:
     """
     Logger that writes into a file
 
-        **Args**
+    Attributes
+    ----------
 
-        * `log_level`: The logging level. eg: `pulsar.LoggerLevel.Info`
-        * `log_file`: The file where to write the logs
+    log_level:
+        The logging level, eg: ``pulsar.LoggerLevel.Info``
+    log_file:
+        The file where to write the logs
     """
     def __init__(self, log_level, log_file):
         _check_type(_pulsar.LoggerLevel, log_level, 'log_level')
diff --git a/pulsar/functions/context.py b/pulsar/functions/context.py
index c1f6801..51b86f0 100644
--- a/pulsar/functions/context.py
+++ b/pulsar/functions/context.py
@@ -36,156 +36,159 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-"""context.py: Context defines context information available during
-# processing of a request.
+"""
+Context defines context information available during processing of a request.
 """
 from abc import abstractmethod
 
+
 class Context(object):
-  """Interface defining information available at process time"""
-  @abstractmethod
-  def get_message_id(self):
-    """Return the messageid of the current message that we are processing"""
-    pass
-
-  @abstractmethod
-  def get_message_key(self):
-    """Return the key of the current message that we are processing"""
-    pass
-
-  @abstractmethod
-  def get_message_eventtime(self):
-    """Return the event time of the current message that we are processing"""
-    pass
-
-  @abstractmethod
-  def get_message_properties(self):
-    """Return the message properties kv map of the current message that we are 
processing"""
-    pass
-
-  @abstractmethod
-  def get_current_message_topic_name(self):
-    """Returns the topic name of the message that we are processing"""
-    pass
-  
-  @abstractmethod
-  def get_function_tenant(self):
-    """Returns the tenant of the message that's being processed"""
-    pass
-
-  @abstractmethod
-  def get_function_namespace(self):
-    """Returns the namespace of the message that's being processed"""
-
-  @abstractmethod
-  def get_function_name(self):
-    """Returns the function name that we are a part of"""
-    pass
-
-  @abstractmethod
-  def get_function_id(self):
-    """Returns the function id that we are a part of"""
-    pass
-
-  @abstractmethod
-  def get_instance_id(self):
-    """Returns the instance id that is executing the function"""
-    pass
-
-  @abstractmethod
-  def get_function_version(self):
-    """Returns the version of function that we are executing"""
-    pass
-
-  @abstractmethod
-  def get_logger(self):
-    """Returns the logger object that can be used to do logging"""
-    pass
-
-  @abstractmethod
-  def get_user_config_value(self, key):
-    """Returns the value of the user-defined config. If the key doesn't exist, 
None is returned"""
-    pass
-  
-  @abstractmethod
-  def get_user_config_map(self):
-    """Returns the entire user-defined config as a dict (the dict will be 
empty if no user-defined config is supplied)"""
-    pass
-
-  @abstractmethod
-  def get_secret(self, secret_name):
-    """Returns the secret value associated with the name. None if nothing was 
found"""
-    pass
-
-  @abstractmethod
-  def get_partition_key(self):
-    """Returns partition key of the input message is one exists"""
-    pass
-
-
-  @abstractmethod
-  def record_metric(self, metric_name, metric_value):
-    """Records the metric_value. metric_value has to satisfy 
isinstance(metric_value, numbers.Number)"""
-    pass
-
-  @abstractmethod
-  def publish(self, topic_name, message, 
serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, 
callback=None, message_conf=None):
-    """Publishes message to topic_name by first serializing the message using 
serde_class_name serde
-    The message will have properties specified if any
-
-    The available options for message_conf:
-
-      properties,
-      partition_key,
-      sequence_id,
-      replication_clusters,
-      disable_replication,
-      event_timestamp
-
-    """
-    pass
-
-  @abstractmethod
-  def get_input_topics(self):
-    """Returns the input topics of function"""
-    pass
-
-  @abstractmethod
-  def get_output_topic(self):
-    """Returns the output topic of function"""
-    pass
-
-  @abstractmethod
-  def get_output_serde_class_name(self):
-    """return output Serde class"""
-    pass
-
-  @abstractmethod
-  def ack(self, msgid, topic):
-    """ack this message id"""
-    pass
-
-  @abstractmethod
-  def incr_counter(self, key, amount):
-    """incr the counter of a given key in the managed state"""
-    pass
-
-  @abstractmethod
-  def get_counter(self, key):
-    """get the counter of a given key in the managed state"""
-    pass
-
-  @abstractmethod
-  def del_counter(self, key):
-    """delete the counter of a given key in the managed state"""
-    pass
-
-  @abstractmethod
-  def put_state(self, key, value):
-    """update the value of a given key in the managed state"""
-    pass
-
-  @abstractmethod
-  def get_state(self, key):
-    """get the value of a given key in the managed state"""
-    pass
+    """Interface defining information available at process time"""
+
+    @abstractmethod
+    def get_message_id(self):
+        """Return the messageid of the current message that we are 
processing"""
+        pass
+
+    @abstractmethod
+    def get_message_key(self):
+        """Return the key of the current message that we are processing"""
+        pass
+
+    @abstractmethod
+    def get_message_eventtime(self):
+        """Return the event time of the current message that we are 
processing"""
+        pass
+
+    @abstractmethod
+    def get_message_properties(self):
+        """Return the message properties kv map of the current message that we 
are processing"""
+        pass
+
+    @abstractmethod
+    def get_current_message_topic_name(self):
+        """Returns the topic name of the message that we are processing"""
+        pass
+
+    @abstractmethod
+    def get_function_tenant(self):
+        """Returns the tenant of the message that's being processed"""
+        pass
+
+    @abstractmethod
+    def get_function_namespace(self):
+        """Returns the namespace of the message that's being processed"""
+
+    @abstractmethod
+    def get_function_name(self):
+        """Returns the function name that we are a part of"""
+        pass
+
+    @abstractmethod
+    def get_function_id(self):
+        """Returns the function id that we are a part of"""
+        pass
+
+    @abstractmethod
+    def get_instance_id(self):
+        """Returns the instance id that is executing the function"""
+        pass
+
+    @abstractmethod
+    def get_function_version(self):
+        """Returns the version of function that we are executing"""
+        pass
+
+    @abstractmethod
+    def get_logger(self):
+        """Returns the logger object that can be used to do logging"""
+        pass
+
+    @abstractmethod
+    def get_user_config_value(self, key):
+        """Returns the value of the user-defined config. If the key doesn't 
exist, None is returned"""
+        pass
+
+    @abstractmethod
+    def get_user_config_map(self):
+        """Returns the entire user-defined config as a dict
+        (the dict will be empty if no user-defined config is supplied)"""
+        pass
+
+    @abstractmethod
+    def get_secret(self, secret_name):
+        """Returns the secret value associated with the name. None if nothing 
was found"""
+        pass
+
+    @abstractmethod
+    def get_partition_key(self):
+        """Returns partition key of the input message is one exists"""
+        pass
+
+    @abstractmethod
+    def record_metric(self, metric_name, metric_value):
+        """Records the metric_value. metric_value has to satisfy 
isinstance(metric_value, numbers.Number)"""
+        pass
+
+    @abstractmethod
+    def publish(self, topic_name, message, 
serde_class_name="serde.IdentitySerDe", properties=None,
+                compression_type=None, callback=None, message_conf=None):
+        """Publishes message to topic_name by first serializing the message 
using serde_class_name serde
+        The message will have properties specified if any
+
+        The available options for message_conf:
+
+          properties,
+          partition_key,
+          sequence_id,
+          replication_clusters,
+          disable_replication,
+          event_timestamp
+
+        """
+        pass
+
+    @abstractmethod
+    def get_input_topics(self):
+        """Returns the input topics of function"""
+        pass
+
+    @abstractmethod
+    def get_output_topic(self):
+        """Returns the output topic of function"""
+        pass
+
+    @abstractmethod
+    def get_output_serde_class_name(self):
+        """return output Serde class"""
+        pass
+
+    @abstractmethod
+    def ack(self, msgid, topic):
+        """ack this message id"""
+        pass
+
+    @abstractmethod
+    def incr_counter(self, key, amount):
+        """incr the counter of a given key in the managed state"""
+        pass
+
+    @abstractmethod
+    def get_counter(self, key):
+        """get the counter of a given key in the managed state"""
+        pass
+
+    @abstractmethod
+    def del_counter(self, key):
+        """delete the counter of a given key in the managed state"""
+        pass
+
+    @abstractmethod
+    def put_state(self, key, value):
+        """update the value of a given key in the managed state"""
+        pass
+
+    @abstractmethod
+    def get_state(self, key):
+        """get the value of a given key in the managed state"""
+        pass
diff --git a/pulsar/functions/function.py b/pulsar/functions/function.py
index ce2919d..107d25d 100644
--- a/pulsar/functions/function.py
+++ b/pulsar/functions/function.py
@@ -36,16 +36,20 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-"""function.py: This is the core interface of the function api.
-# The process method is called for every message of the input topic of the
-# function. The incoming input bytes are deserialized using the serde.
-# The process function can optionally emit an output
+"""
+This is the core interface of the function api.
+
+The process method is called for every message of the input topic of the
+function. The incoming input bytes are deserialized using the serde.
+The process function can optionally emit an output
 """
 from abc import abstractmethod
 
+
 class Function(object):
-  """Interface for Pulsar Function"""
-  @abstractmethod
-  def process(self, input, context):
-    """Process input message"""
-    pass
\ No newline at end of file
+    """Interface for Pulsar Function"""
+
+    @abstractmethod
+    def process(self, input, context):
+        """Process input message"""
+        pass
diff --git a/pulsar/functions/serde.py b/pulsar/functions/serde.py
index 7b07673..367d55c 100644
--- a/pulsar/functions/serde.py
+++ b/pulsar/functions/serde.py
@@ -36,52 +36,59 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-"""serde.py: SerDe defines the interface for serialization/deserialization.
-# Everytime a message is read from pulsar topic, the serde is invoked to
-# serialize the bytes into an object before invoking the process method.
-# Anytime a python object needs to be written back to pulsar, it is
-# serialized into bytes before writing.
 """
-from abc import abstractmethod
+SerDe defines the interface for serialization/deserialization.
 
+Everytime a message is read from pulsar topic, the serde is invoked to
+serialize the bytes into an object before invoking the process method.
+Anytime a python object needs to be written back to pulsar, it is
+serialized into bytes before writing.
+"""
 import pickle
+from abc import abstractmethod
+
 
 class SerDe(object):
-  """Interface for Serialization/Deserialization"""
-  @abstractmethod
-  def serialize(self, input):
-    """Serialize input message into bytes"""
-    pass
+    """Interface for Serialization/Deserialization"""
+
+    @abstractmethod
+    def serialize(self, input):
+        """Serialize input message into bytes"""
+        pass
+
+    @abstractmethod
+    def deserialize(self, input_bytes):
+        """Serialize input_bytes into an object"""
+        pass
 
-  @abstractmethod
-  def deserialize(self, input_bytes):
-    """Serialize input_bytes into an object"""
-    pass
 
 class PickleSerDe(SerDe):
-  """Pickle based serializer"""
-  def serialize(self, input):
-      return pickle.dumps(input)
+    """Pickle based serializer"""
+
+    def serialize(self, input):
+        return pickle.dumps(input)
+
+    def deserialize(self, input_bytes):
+        return pickle.loads(input_bytes)
 
-  def deserialize(self, input_bytes):
-      return pickle.loads(input_bytes)
 
 class IdentitySerDe(SerDe):
-  """Simple Serde that just conversion to string and back"""
-  def __init__(self):
-    self._types = [int, float, complex, str]
-
-  def serialize(self, input):
-    if type(input) in self._types:
-      return str(input).encode('utf-8')
-    if type(input) == bytes:
-      return input
-    raise TypeError("IdentitySerde cannot serialize object of type %s" % 
type(input))
-
-  def deserialize(self, input_bytes):
-    for typ in self._types:
-      try:
-        return typ(input_bytes.decode('utf-8'))
-      except:
-        pass
-    return input_bytes
+    """Simple Serde that just conversion to string and back"""
+
+    def __init__(self):
+        self._types = [int, float, complex, str]
+
+    def serialize(self, input):
+        if type(input) in self._types:
+            return str(input).encode('utf-8')
+        if type(input) == bytes:
+            return input
+        raise TypeError("IdentitySerde cannot serialize object of type %s" % 
type(input))
+
+    def deserialize(self, input_bytes):
+        for typ in self._types:
+            try:
+                return typ(input_bytes.decode('utf-8'))
+            except:
+                pass
+        return input_bytes

Reply via email to