This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new c8a94f26 Add MessageIdCodec class (#530)
c8a94f26 is described below
commit c8a94f264c891e3ea679c45a0ef5b9ad7fee17b0
Author: Yan Chao Mei <[email protected]>
AuthorDate: Fri Jun 2 16:14:46 2023 +0800
Add MessageIdCodec class (#530)
* add MessageIdCodec class
* add license
* fix style issue
* rename message_id_codec file
* fix process id overflow issue
* Update python/client/message/message_id_codec.py
* Fix style issue
---------
Co-authored-by: Aaron Ai <[email protected]>
---
.github/workflows/python_build.yml | 2 +-
python/client/message/message_id_codec.py | 81 +++++++++++++++++++++++++++++++
python/client/rpc_client.py | 22 ++++++---
3 files changed, 97 insertions(+), 8 deletions(-)
diff --git a/.github/workflows/python_build.yml
b/.github/workflows/python_build.yml
index 4524a784..2fd44c09 100644
--- a/.github/workflows/python_build.yml
+++ b/.github/workflows/python_build.yml
@@ -15,7 +15,7 @@ jobs:
- run: python -m pip install flake8
- name: flake8
run: |
- flake8 --ignore=E501 --exclude python/protocol python
+ flake8 --ignore=E501,W503 --exclude python/protocol python
isort:
runs-on: ubuntu-latest
steps:
diff --git a/python/client/message/message_id_codec.py
b/python/client/message/message_id_codec.py
new file mode 100644
index 00000000..0a52c831
--- /dev/null
+++ b/python/client/message/message_id_codec.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import math
+import os
+import threading
+import time
+import uuid
+from datetime import datetime, timezone
+
+
+class MessageIdCodec:
+ __MESSAGE_ID_VERSION_V1 = "01"
+
+ @staticmethod
+ def __get_process_fixed_string():
+ mac = uuid.getnode()
+ mac = format(mac, "012x")
+ mac_bytes = bytes.fromhex(mac[-12:])
+ pid = os.getpid() % 65536
+ pid_bytes = pid.to_bytes(2, "big")
+ return mac_bytes.hex().upper() + pid_bytes.hex().upper()
+
+ @staticmethod
+ def __get_seconds_since_custom_epoch():
+ custom_epoch = datetime(2021, 1, 1, tzinfo=timezone.utc)
+ now = datetime.now(timezone.utc)
+ return int((now - custom_epoch).total_seconds())
+
+ __PROCESS_FIXED_STRING_V1 = __get_process_fixed_string()
+ __SECONDS_SINCE_CUSTOM_EPOCH = __get_seconds_since_custom_epoch()
+ __SECONDS_START_TIMESTAMP = int(time.time())
+
+ @staticmethod
+ def __delta_seconds():
+ return (
+ int(time.time())
+ - MessageIdCodec.__SECONDS_START_TIMESTAMP
+ + MessageIdCodec.__SECONDS_SINCE_CUSTOM_EPOCH
+ )
+
+ @staticmethod
+ def __int_to_bytes_with_big_endian(number: int, min_bytes: int):
+ num_bytes = max(math.ceil(number.bit_length() / 8), min_bytes)
+ return number.to_bytes(num_bytes, "big")
+
+ __SEQUENCE = 0
+ __SEQUENCE_LOCK = threading.Lock()
+
+ @staticmethod
+ def __get_and_increment_sequence():
+ with MessageIdCodec.__SEQUENCE_LOCK:
+ temp = MessageIdCodec.__SEQUENCE
+ MessageIdCodec.__SEQUENCE += 1
+ return temp
+
+ @staticmethod
+ def next_message_id():
+ seconds = MessageIdCodec.__delta_seconds()
+ seconds_bytes = MessageIdCodec.__int_to_bytes_with_big_endian(seconds,
4)[-4:]
+ sequence_bytes = MessageIdCodec.__int_to_bytes_with_big_endian(
+ MessageIdCodec.__get_and_increment_sequence(), 4
+ )[-4:]
+ return (
+ MessageIdCodec.__MESSAGE_ID_VERSION_V1
+ + MessageIdCodec.__PROCESS_FIXED_STRING_V1
+ + seconds_bytes.hex().upper()
+ + sequence_bytes.hex().upper()
+ )
diff --git a/python/client/rpc_client.py b/python/client/rpc_client.py
index cc1dfd4a..1ed9b5e3 100644
--- a/python/client/rpc_client.py
+++ b/python/client/rpc_client.py
@@ -26,15 +26,19 @@ class RpcClient:
def __init__(self, endpoints, ssl_enabled):
channel_options = [
- ('grpc.max_send_message_length', -1),
- ('grpc.max_receive_message_length', -1),
- ('grpc.connect_timeout_ms', self.CONNECT_TIMEOUT_MILLIS),
+ ("grpc.max_send_message_length", -1),
+ ("grpc.max_receive_message_length", -1),
+ ("grpc.connect_timeout_ms", self.CONNECT_TIMEOUT_MILLIS),
]
if ssl_enabled:
ssl_credentials = ssl_channel_credentials()
- self.channel = aio.secure_channel(endpoints.getGrpcTarget(),
ssl_credentials, options=channel_options)
+ self.channel = aio.secure_channel(
+ endpoints.getGrpcTarget(), ssl_credentials,
options=channel_options
+ )
else:
- self.channel = insecure_channel(endpoints.getGrpcTarget(),
options=channel_options)
+ self.channel = insecure_channel(
+ endpoints.getGrpcTarget(), options=channel_options
+ )
self.activity_nano_time = time.monotonic_ns()
@@ -42,7 +46,9 @@ class RpcClient:
self.channel.close()
def idle_duration(self):
- return timedelta(microseconds=(time.monotonic_ns() -
self.activity_nano_time) / 1000)
+ return timedelta(
+ microseconds=(time.monotonic_ns() - self.activity_nano_time) / 1000
+ )
async def query_route(self, request, timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()
@@ -81,7 +87,9 @@ class RpcClient:
async def forward_message_to_dead_letter_queue(self, request,
timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()
stub = service.MessagingServiceStub(self.channel)
- return await stub.ForwardMessageToDeadLetterQueue(request,
timeout=timeout_seconds)
+ return await stub.ForwardMessageToDeadLetterQueue(
+ request, timeout=timeout_seconds
+ )
async def end_transaction(self, request, timeout_seconds: int):
self.activity_nano_time = time.monotonic_ns()