This is an automated email from the ASF dual-hosted git repository.
jialiang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 69ed31c19c AMBARI-26099. Ambari Common Stomp Module AttributeError
(#3798)
69ed31c19c is described below
commit 69ed31c19c32531621056f0a55b16975e911ca2f
Author: Peng Lee <[email protected]>
AuthorDate: Thu Aug 15 08:51:57 2024 +0800
AMBARI-26099. Ambari Common Stomp Module AttributeError (#3798)
---
.../test/python/ambari_agent/TestAmbariStomp.py | 41 ++++++++++++++++++++++
.../src/main/python/ambari_stomp/transport.py | 8 ++---
.../src/test/python/coilmq/util/frames.py | 17 +++++----
3 files changed, 55 insertions(+), 11 deletions(-)
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAmbariStomp.py
b/ambari-agent/src/test/python/ambari_agent/TestAmbariStomp.py
new file mode 100644
index 0000000000..021cfedfce
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/TestAmbariStomp.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python3
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from BaseStompServerTestCase import BaseStompServerTestCase
+import ambari_stomp
+
+class TestAmbariStomp(BaseStompServerTestCase):
+ def test_ambari_stomp(self):
+ conn = ambari_stomp.Connection10([('127.0.0.1', 21613)])
+ conn.set_listener('', MyListener())
+ conn.start()
+ try:
+ conn.connect("test", "test", wait=False)
+ conn.subscribe(destination='/queue/test', id=1, ack='auto')
+ conn.send(body='some text', destination='/queue/test', headers={"test":
'aaa', "test2": "bbb"})
+ except AttributeError as e:
+ raise AttributeError(e)
+
+class MyListener(ambari_stomp.ConnectionListener):
+ def on_error(self, headers, body):
+ print('received an error "%s"' % body)
+
+ def on_message(self, headers, body):
+ print('received a message "%s"' % body)
\ No newline at end of file
diff --git a/ambari-common/src/main/python/ambari_stomp/transport.py
b/ambari-common/src/main/python/ambari_stomp/transport.py
index 7c6a507155..184254849d 100644
--- a/ambari-common/src/main/python/ambari_stomp/transport.py
+++ b/ambari-common/src/main/python/ambari_stomp/transport.py
@@ -378,7 +378,7 @@ class BaseTransport(ambari_stomp.listener.Publisher):
c = b''
if c is None or len(c) == 0:
raise exception.ConnectionClosedException()
- if c.encode() == b'\x0a' and not self.__recvbuf and not
fastbuf.tell():
+ if c == b'\x0a' and not self.__recvbuf and not fastbuf.tell():
#
# EOL to an empty receive buffer: treat as heartbeat.
# Note that this may misdetect an optional EOL at end of frame
as heartbeat in case the
@@ -386,9 +386,9 @@ class BaseTransport(ambari_stomp.listener.Publisher):
# last byte of that read. But that should be harmless in
practice.
#
fastbuf.close()
- return [c.encode()]
- fastbuf.write(c.encode())
- if b'\x00' in c.encode():
+ return [c]
+ fastbuf.write(c)
+ if b'\x00' in c:
#
# Possible end of frame
#
diff --git a/ambari-common/src/test/python/coilmq/util/frames.py
b/ambari-common/src/test/python/coilmq/util/frames.py
index 3a3580e8b4..b50a7aba75 100644
--- a/ambari-common/src/test/python/coilmq/util/frames.py
+++ b/ambari-common/src/test/python/coilmq/util/frames.py
@@ -19,10 +19,11 @@ COMMIT = 'COMMIT'
ABORT = 'ABORT'
ACK = 'ACK'
NACK = 'NACK'
+STOMP_CMD = 'STOMP'
DISCONNECT = 'DISCONNECT'
-VALID_COMMANDS = ['message', 'connect', 'connected', 'error', 'send',
- 'subscribe', 'unsubscribe', 'begin', 'commit', 'abort',
'ack', 'disconnect', 'nack']
+VALID_COMMANDS = [MESSAGE, CONNECT, CONNECTED, ERROR, SEND,
+ SUBSCRIBE, UNSUBSCRIBE, BEGIN, COMMIT, ABORT, ACK,
DISCONNECT, NACK, STOMP_CMD]
TEXT_PLAIN = 'text/plain'
@@ -43,7 +44,9 @@ def parse_headers(buff):
"""
Parses buffer and returns command and headers as strings
"""
- preamble_lines = list([six.u(x).decode() for x in iter(lambda:
buff.readline().strip(), b'')]
+ preamble_lines = list(map(
+ lambda x: six.u(x).decode(),
+ iter(lambda: buff.readline().strip(), b''))
)
if not preamble_lines:
raise EmptyBuffer()
@@ -141,7 +144,7 @@ class ConnectedFrame(Frame):
@type session: C{str}
"""
super(ConnectedFrame, self).__init__(
- cmd='connected', headers=extra_headers or {})
+ cmd=CONNECTED, headers=extra_headers or {})
self.headers['session'] = session
@@ -196,7 +199,7 @@ class ErrorFrame(Frame):
@param body: The message body bytes.
@type body: C{str}
"""
- super(ErrorFrame, self).__init__(cmd='error',
+ super(ErrorFrame, self).__init__(cmd=ERROR,
headers=extra_headers or {},
body=body)
self.headers['message'] = message
self.headers[
@@ -354,5 +357,5 @@ class FrameBuffer(object):
raise StopIteration()
return msg
- def __next__(self):
- return self.__next__()
+ def next(self):
+ return self.__next__()
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]