This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new c8a94f26 Add MessageIdCodec class (#530)
c8a94f26 is described below

commit c8a94f264c891e3ea679c45a0ef5b9ad7fee17b0
Author: Yan Chao Mei <[email protected]>
AuthorDate: Fri Jun 2 16:14:46 2023 +0800

    Add MessageIdCodec class (#530)
    
    * add MessageIdCodec class
    
    * add license
    
    * fix style issue
    
    * rename message_id_codec file
    
    * fix process id overflow issue
    
    * Update python/client/message/message_id_codec.py
    
    * Fix style issue
    
    ---------
    
    Co-authored-by: Aaron Ai <[email protected]>
---
 .github/workflows/python_build.yml        |  2 +-
 python/client/message/message_id_codec.py | 81 +++++++++++++++++++++++++++++++
 python/client/rpc_client.py               | 22 ++++++---
 3 files changed, 97 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/python_build.yml 
b/.github/workflows/python_build.yml
index 4524a784..2fd44c09 100644
--- a/.github/workflows/python_build.yml
+++ b/.github/workflows/python_build.yml
@@ -15,7 +15,7 @@ jobs:
       - run: python -m pip install flake8
       - name: flake8
         run: |
-          flake8 --ignore=E501 --exclude python/protocol python
+          flake8 --ignore=E501,W503 --exclude python/protocol python
   isort:
     runs-on: ubuntu-latest
     steps:
diff --git a/python/client/message/message_id_codec.py 
b/python/client/message/message_id_codec.py
new file mode 100644
index 00000000..0a52c831
--- /dev/null
+++ b/python/client/message/message_id_codec.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import math
+import os
+import threading
+import time
+import uuid
+from datetime import datetime, timezone
+
+
+class MessageIdCodec:
+    __MESSAGE_ID_VERSION_V1 = "01"
+
+    @staticmethod
+    def __get_process_fixed_string():
+        mac = uuid.getnode()
+        mac = format(mac, "012x")
+        mac_bytes = bytes.fromhex(mac[-12:])
+        pid = os.getpid() % 65536
+        pid_bytes = pid.to_bytes(2, "big")
+        return mac_bytes.hex().upper() + pid_bytes.hex().upper()
+
+    @staticmethod
+    def __get_seconds_since_custom_epoch():
+        custom_epoch = datetime(2021, 1, 1, tzinfo=timezone.utc)
+        now = datetime.now(timezone.utc)
+        return int((now - custom_epoch).total_seconds())
+
+    __PROCESS_FIXED_STRING_V1 = __get_process_fixed_string()
+    __SECONDS_SINCE_CUSTOM_EPOCH = __get_seconds_since_custom_epoch()
+    __SECONDS_START_TIMESTAMP = int(time.time())
+
+    @staticmethod
+    def __delta_seconds():
+        return (
+            int(time.time())
+            - MessageIdCodec.__SECONDS_START_TIMESTAMP
+            + MessageIdCodec.__SECONDS_SINCE_CUSTOM_EPOCH
+        )
+
+    @staticmethod
+    def __int_to_bytes_with_big_endian(number: int, min_bytes: int):
+        num_bytes = max(math.ceil(number.bit_length() / 8), min_bytes)
+        return number.to_bytes(num_bytes, "big")
+
+    __SEQUENCE = 0
+    __SEQUENCE_LOCK = threading.Lock()
+
+    @staticmethod
+    def __get_and_increment_sequence():
+        with MessageIdCodec.__SEQUENCE_LOCK:
+            temp = MessageIdCodec.__SEQUENCE
+            MessageIdCodec.__SEQUENCE += 1
+            return temp
+
+    @staticmethod
+    def next_message_id():
+        seconds = MessageIdCodec.__delta_seconds()
+        seconds_bytes = MessageIdCodec.__int_to_bytes_with_big_endian(seconds, 
4)[-4:]
+        sequence_bytes = MessageIdCodec.__int_to_bytes_with_big_endian(
+            MessageIdCodec.__get_and_increment_sequence(), 4
+        )[-4:]
+        return (
+            MessageIdCodec.__MESSAGE_ID_VERSION_V1
+            + MessageIdCodec.__PROCESS_FIXED_STRING_V1
+            + seconds_bytes.hex().upper()
+            + sequence_bytes.hex().upper()
+        )
diff --git a/python/client/rpc_client.py b/python/client/rpc_client.py
index cc1dfd4a..1ed9b5e3 100644
--- a/python/client/rpc_client.py
+++ b/python/client/rpc_client.py
@@ -26,15 +26,19 @@ class RpcClient:
 
     def __init__(self, endpoints, ssl_enabled):
         channel_options = [
-            ('grpc.max_send_message_length', -1),
-            ('grpc.max_receive_message_length', -1),
-            ('grpc.connect_timeout_ms', self.CONNECT_TIMEOUT_MILLIS),
+            ("grpc.max_send_message_length", -1),
+            ("grpc.max_receive_message_length", -1),
+            ("grpc.connect_timeout_ms", self.CONNECT_TIMEOUT_MILLIS),
         ]
         if ssl_enabled:
             ssl_credentials = ssl_channel_credentials()
-            self.channel = aio.secure_channel(endpoints.getGrpcTarget(), 
ssl_credentials, options=channel_options)
+            self.channel = aio.secure_channel(
+                endpoints.getGrpcTarget(), ssl_credentials, 
options=channel_options
+            )
         else:
-            self.channel = insecure_channel(endpoints.getGrpcTarget(), 
options=channel_options)
+            self.channel = insecure_channel(
+                endpoints.getGrpcTarget(), options=channel_options
+            )
 
         self.activity_nano_time = time.monotonic_ns()
 
@@ -42,7 +46,9 @@ class RpcClient:
         self.channel.close()
 
     def idle_duration(self):
-        return timedelta(microseconds=(time.monotonic_ns() - 
self.activity_nano_time) / 1000)
+        return timedelta(
+            microseconds=(time.monotonic_ns() - self.activity_nano_time) / 1000
+        )
 
     async def query_route(self, request, timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()
@@ -81,7 +87,9 @@ class RpcClient:
     async def forward_message_to_dead_letter_queue(self, request, 
timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()
         stub = service.MessagingServiceStub(self.channel)
-        return await stub.ForwardMessageToDeadLetterQueue(request, 
timeout=timeout_seconds)
+        return await stub.ForwardMessageToDeadLetterQueue(
+            request, timeout=timeout_seconds
+        )
 
     async def end_transaction(self, request, timeout_seconds: int):
         self.activity_nano_time = time.monotonic_ns()

Reply via email to