aaron-ai commented on code in PR #559:
URL: https://github.com/apache/rocketmq-clients/pull/559#discussion_r1259090384


##########
python/rocketmq/client.py:
##########
@@ -13,48 +13,137 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import asyncio
 import threading
 from typing import Set
 
-from protocol import service_pb2
-from protocol.service_pb2 import QueryRouteRequest
+from protocol import definition_pb2, service_pb2
+from protocol.service_pb2 import HeartbeatRequest, QueryRouteRequest
 from rocketmq.client_config import ClientConfig
 from rocketmq.client_id_encoder import ClientIdEncoder
-from rocketmq.definition import TopicRouteData
+from rocketmq.definition import Resource, TopicRouteData
+from rocketmq.log import logger
 from rocketmq.rpc_client import Endpoints, RpcClient
 from rocketmq.session import Session
 from rocketmq.signature import Signature
 
 
+class ScheduleWithFixedDelay:
+    def __init__(self, action, delay, period):
+        self.action = action
+        self.delay = delay
+        self.period = period
+        self.task = None
+
+    async def start(self):
+        # await asyncio.sleep(self.delay)
+        while True:
+            await self.action()
+            await asyncio.sleep(self.period)
+
+    def schedule(self):
+        loop1 = asyncio.new_event_loop()
+        asyncio.set_event_loop(loop1)
+        self.task = asyncio.create_task(self.start())
+
+    def cancel(self):
+        if self.task:
+            self.task.cancel()
+
+
 class Client:
+    """
+    Main client class which handles interaction with the server.
+    """
     def __init__(self, client_config: ClientConfig, topics: Set[str]):
+        """
+        Initialization method for the Client class.
+
+        :param client_config: Client configuration.
+        :param topics: Set of topics that the client is subscribed to.
+        """
         self.client_config = client_config
         self.client_id = ClientIdEncoder.generate()
         self.endpoints = client_config.endpoints
         self.topics = topics
 
+        #: A cache to store topic routes.
         self.topic_route_cache = {}
 
+        #: A table to store session information.
         self.sessions_table = {}
         self.sessionsLock = threading.Lock()
         self.client_manager = ClientManager(self)
 
+        #: A dictionary to store isolated items.
+        self.isolated = dict()
+
     async def start(self):
+        """
+        Start method which initiates fetching of topic routes and schedules 
heartbeats.
+        """
         # get topic route
         for topic in self.topics:
             self.topic_route_cache[topic] = await self.fetch_topic_route(topic)
+        scheduler = ScheduleWithFixedDelay(self.heartbeat, 3, 12)
+        scheduler.schedule()
+
+    async def heartbeat(self):
+        """
+        Asynchronous method that sends a heartbeat to the server.
+        """
+        try:
+            endpoints = self.GetTotalRouteEndpoints()
+            request = HeartbeatRequest()
+            request.client_type = definition_pb2.PRODUCER
+            topic = Resource()
+            topic.name = "normal_topic"
+            invocations = {}
+            logger.info(len(endpoints))
+            # Collect task into a map.
+            for item in endpoints:
+                task = await self.client_manager.heartbeat(item, request, 
self.client_config.request_timeout)
+                invocations[item] = task
+                logger.info(task)
+                logger.info("finish")
+                break
+        except Exception as e:
+            logger.error(f"[Bug] unexpected exception raised during heartbeat, 
clientId={self.client_id}, Exception: {str(e)}")
 
     def GetTotalRouteEndpoints(self):

Review Comment:
   Please use snake case naming rather than camel naming.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to