This is an automated email from the ASF dual-hosted git repository. jialiang pushed a commit to branch revert-3798-AMBARI-26099 in repository https://gitbox.apache.org/repos/asf/ambari.git
commit afa12d84c0ac739085982fe7bf02835789f7b90b Author: jialiang <[email protected]> AuthorDate: Fri Aug 16 13:42:04 2024 +0800 Revert "AMBARI-26099. Ambari Common Stomp Module AttributeError (#3798)" This reverts commit 69ed31c19c32531621056f0a55b16975e911ca2f. --- .../test/python/ambari_agent/TestAmbariStomp.py | 41 ---------------------- .../src/main/python/ambari_stomp/transport.py | 8 ++--- .../src/test/python/coilmq/util/frames.py | 17 ++++----- 3 files changed, 11 insertions(+), 55 deletions(-) diff --git a/ambari-agent/src/test/python/ambari_agent/TestAmbariStomp.py b/ambari-agent/src/test/python/ambari_agent/TestAmbariStomp.py deleted file mode 100644 index 021cfedfce..0000000000 --- a/ambari-agent/src/test/python/ambari_agent/TestAmbariStomp.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python3 - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -from BaseStompServerTestCase import BaseStompServerTestCase -import ambari_stomp - -class TestAmbariStomp(BaseStompServerTestCase): - def test_ambari_stomp(self): - conn = ambari_stomp.Connection10([('127.0.0.1', 21613)]) - conn.set_listener('', MyListener()) - conn.start() - try: - conn.connect("test", "test", wait=False) - conn.subscribe(destination='/queue/test', id=1, ack='auto') - conn.send(body='some text', destination='/queue/test', headers={"test": 'aaa', "test2": "bbb"}) - except AttributeError as e: - raise AttributeError(e) - -class MyListener(ambari_stomp.ConnectionListener): - def on_error(self, headers, body): - print('received an error "%s"' % body) - - def on_message(self, headers, body): - print('received a message "%s"' % body) \ No newline at end of file diff --git a/ambari-common/src/main/python/ambari_stomp/transport.py b/ambari-common/src/main/python/ambari_stomp/transport.py index 184254849d..7c6a507155 100644 --- a/ambari-common/src/main/python/ambari_stomp/transport.py +++ b/ambari-common/src/main/python/ambari_stomp/transport.py @@ -378,7 +378,7 @@ class BaseTransport(ambari_stomp.listener.Publisher): c = b'' if c is None or len(c) == 0: raise exception.ConnectionClosedException() - if c == b'\x0a' and not self.__recvbuf and not fastbuf.tell(): + if c.encode() == b'\x0a' and not self.__recvbuf and not fastbuf.tell(): # # EOL to an empty receive buffer: treat as heartbeat. # Note that this may misdetect an optional EOL at end of frame as heartbeat in case the @@ -386,9 +386,9 @@ class BaseTransport(ambari_stomp.listener.Publisher): # last byte of that read. But that should be harmless in practice. # fastbuf.close() - return [c] - fastbuf.write(c) - if b'\x00' in c: + return [c.encode()] + fastbuf.write(c.encode()) + if b'\x00' in c.encode(): # # Possible end of frame # diff --git a/ambari-common/src/test/python/coilmq/util/frames.py b/ambari-common/src/test/python/coilmq/util/frames.py index b50a7aba75..3a3580e8b4 100644 --- a/ambari-common/src/test/python/coilmq/util/frames.py +++ b/ambari-common/src/test/python/coilmq/util/frames.py @@ -19,11 +19,10 @@ COMMIT = 'COMMIT' ABORT = 'ABORT' ACK = 'ACK' NACK = 'NACK' -STOMP_CMD = 'STOMP' DISCONNECT = 'DISCONNECT' -VALID_COMMANDS = [MESSAGE, CONNECT, CONNECTED, ERROR, SEND, - SUBSCRIBE, UNSUBSCRIBE, BEGIN, COMMIT, ABORT, ACK, DISCONNECT, NACK, STOMP_CMD] +VALID_COMMANDS = ['message', 'connect', 'connected', 'error', 'send', + 'subscribe', 'unsubscribe', 'begin', 'commit', 'abort', 'ack', 'disconnect', 'nack'] TEXT_PLAIN = 'text/plain' @@ -44,9 +43,7 @@ def parse_headers(buff): """ Parses buffer and returns command and headers as strings """ - preamble_lines = list(map( - lambda x: six.u(x).decode(), - iter(lambda: buff.readline().strip(), b'')) + preamble_lines = list([six.u(x).decode() for x in iter(lambda: buff.readline().strip(), b'')] ) if not preamble_lines: raise EmptyBuffer() @@ -144,7 +141,7 @@ class ConnectedFrame(Frame): @type session: C{str} """ super(ConnectedFrame, self).__init__( - cmd=CONNECTED, headers=extra_headers or {}) + cmd='connected', headers=extra_headers or {}) self.headers['session'] = session @@ -199,7 +196,7 @@ class ErrorFrame(Frame): @param body: The message body bytes. @type body: C{str} """ - super(ErrorFrame, self).__init__(cmd=ERROR, + super(ErrorFrame, self).__init__(cmd='error', headers=extra_headers or {}, body=body) self.headers['message'] = message self.headers[ @@ -357,5 +354,5 @@ class FrameBuffer(object): raise StopIteration() return msg - def next(self): - return self.__next__() \ No newline at end of file + def __next__(self): + return self.__next__() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
