aaron-ai commented on code in PR #559:
URL: https://github.com/apache/rocketmq-clients/pull/559#discussion_r1252478609
##########
python/rocketmq/producer.py:
##########
@@ -108,25 +113,111 @@ async def shutdown(self):
logger.info(f"Begin to shutdown the rocketmq producer,
client_id={self.client_id}")
logger.info(f"Shutdown the rocketmq producer successfully,
client_id={self.client_id}")
- async def send_message(self, message):
+ @staticmethod
+ def wrap_send_message_request(message, message_queue):
req = SendMessageRequest()
- req.messages.extend([message])
- topic_data = self.topic_route_cache["normal_topic"]
- endpoints = topic_data.message_queues[2].broker.endpoints
- return await self.client_manager.send_message(endpoints, req, 10)
+ req.messages.extend([message.to_protobuf(message_queue.queue_id)])
+ return req
+
+ async def send_message(self, message):
+ # get load balancer
+ publish_load_balancer = await
self.get_publish_load_balancer(message.topic)
+ publishing_message = PublishingMessage(message, self.publish_settings)
+ retry_policy = self.get_retry_policy()
+ maxAttempts = retry_policy.get_max_attempts()
+
+ exception = None
+
+ candidates = (
+
publish_load_balancer.take_message_queues(set(self.isolated.keys()),
maxAttempts)
+ if publishing_message.message.message_group is None else
+
[publish_load_balancer.take_message_queue_by_message_group(publishing_message.message.message_group)]
+ )
+ print(candidates)
+ for attempt in range(1, maxAttempts + 1):
+ stopwatch_start = time.time()
+
+ candidateIndex = (attempt - 1) % len(candidates)
+ mq = candidates[candidateIndex]
+ # print(mq.accept_message_types[0] ==
publishing_message.message_type)
+ # print((mq.accept_message_types[0].value))
+ # print((publishing_message.message_type)) TODO check why it's
wrong using not in
+ if self.publish_settings.is_validate_message_type() and
publishing_message.message_type.value != mq.accept_message_types[0].value:
+ raise ValueError(
+ "Current message type does not match with the accept
message types," +
+ f" topic={message.topic},
actualMessageType={publishing_message.message_type}" +
+ f" acceptMessageType={','}")
+
+ sendMessageRequest =
self.wrap_send_message_request(publishing_message, mq)
+ topic_data = self.topic_route_cache["normal_topic"]
+ endpoints = topic_data.message_queues[2].broker.endpoints
+ try:
+ invocation = await self.client_manager.send_message(endpoints,
sendMessageRequest, self.client_config.request_timeout)
+ # sendReceipts = SendReceipt.process_send_message_response(mq,
invocation)
+ print(invocation)
+ # sendReceipt = sendReceipts[0]
+ if attempt > 1:
+ logger.info(
+ f"Re-send message successfully,
topic={message.topic}," +
+ f" maxAttempts={maxAttempts}, endpoints=,
clientId={self.client_id}")
+
+ # return sendReceipt
+ except Exception as e:
+ exception = e
+ self.isolated[endpoints] = True
+ if attempt >= maxAttempts:
+ logger.info(f"Failed to send message finally, run out of
attempt times, " +
+ f"topic={message.topic},
maxAttempt={maxAttempts}, attempt={attempt}, " +
+ f"endpoints={endpoints},
messageId={message.message_id}, clientId={self.client_id}")
+ raise
+
+ if message.message_type == "Transaction":
+ logger.info(f"Failed to send transaction message, run out
of attempt times, " +
+ f"topic={message.topic}, maxAttempt=1,
attempt={attempt}, " +
+ f"endpoints={endpoints},
messageId={message.message_id}, clientId={self.client_id}")
+ raise
+
+ if not isinstance(exception, TooManyRequestsException):
Review Comment:
where is `TooManyRequestsException`?
##########
python/rocketmq/producer.py:
##########
@@ -108,25 +113,111 @@ async def shutdown(self):
logger.info(f"Begin to shutdown the rocketmq producer,
client_id={self.client_id}")
logger.info(f"Shutdown the rocketmq producer successfully,
client_id={self.client_id}")
- async def send_message(self, message):
+ @staticmethod
+ def wrap_send_message_request(message, message_queue):
req = SendMessageRequest()
- req.messages.extend([message])
- topic_data = self.topic_route_cache["normal_topic"]
- endpoints = topic_data.message_queues[2].broker.endpoints
- return await self.client_manager.send_message(endpoints, req, 10)
+ req.messages.extend([message.to_protobuf(message_queue.queue_id)])
+ return req
+
+ async def send_message(self, message):
+ # get load balancer
+ publish_load_balancer = await
self.get_publish_load_balancer(message.topic)
+ publishing_message = PublishingMessage(message, self.publish_settings)
+ retry_policy = self.get_retry_policy()
+ maxAttempts = retry_policy.get_max_attempts()
+
+ exception = None
+
+ candidates = (
+
publish_load_balancer.take_message_queues(set(self.isolated.keys()),
maxAttempts)
+ if publishing_message.message.message_group is None else
+
[publish_load_balancer.take_message_queue_by_message_group(publishing_message.message.message_group)]
+ )
+ print(candidates)
+ for attempt in range(1, maxAttempts + 1):
+ stopwatch_start = time.time()
+
+ candidateIndex = (attempt - 1) % len(candidates)
+ mq = candidates[candidateIndex]
+ # print(mq.accept_message_types[0] ==
publishing_message.message_type)
+ # print((mq.accept_message_types[0].value))
+ # print((publishing_message.message_type)) TODO check why it's
wrong using not in
+ if self.publish_settings.is_validate_message_type() and
publishing_message.message_type.value != mq.accept_message_types[0].value:
+ raise ValueError(
+ "Current message type does not match with the accept
message types," +
+ f" topic={message.topic},
actualMessageType={publishing_message.message_type}" +
+ f" acceptMessageType={','}")
+
+ sendMessageRequest =
self.wrap_send_message_request(publishing_message, mq)
+ topic_data = self.topic_route_cache["normal_topic"]
+ endpoints = topic_data.message_queues[2].broker.endpoints
+ try:
+ invocation = await self.client_manager.send_message(endpoints,
sendMessageRequest, self.client_config.request_timeout)
+ # sendReceipts = SendReceipt.process_send_message_response(mq,
invocation)
+ print(invocation)
+ # sendReceipt = sendReceipts[0]
+ if attempt > 1:
+ logger.info(
+ f"Re-send message successfully,
topic={message.topic}," +
+ f" maxAttempts={maxAttempts}, endpoints=,
clientId={self.client_id}")
+
+ # return sendReceipt
+ except Exception as e:
+ exception = e
+ self.isolated[endpoints] = True
+ if attempt >= maxAttempts:
+ logger.info(f"Failed to send message finally, run out of
attempt times, " +
+ f"topic={message.topic},
maxAttempt={maxAttempts}, attempt={attempt}, " +
+ f"endpoints={endpoints},
messageId={message.message_id}, clientId={self.client_id}")
+ raise
+
+ if message.message_type == "Transaction":
Review Comment:
Enumeration should be used here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]