This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4befb41c0933cbb5d41dd354927755e00ae343c6 Author: Yunze Xu <[email protected]> AuthorDate: Sun Aug 8 23:17:58 2021 +0800 [C++/Python] Fix bugs that were not exposed by broken C++ CI before (#11557) Fixes #11551 ### Motivation Currently there're some bugs of C++ client and some tests cannot pass: 1. Introduced from #10601 because it changed the behavior of the admin API to get partition metadata while the C++ implementation relies on the original behavior to create topics automatically. So any test that uses HTTP lookup will fail. - AuthPluginTest.testTlsDetectHttps - AuthPluginToken.testTokenWithHttpUrl - BasicEndToEndTest.testHandlerReconnectionLogic - BasicEndToEndTest.testV2TopicHttp - ClientDeduplicationTest.testProducerDeduplication 2. Introduced from #11029 and #11486 , the implementation will iterate more than once even there's only one valid resolved IP address. - ClientTest.testConnectTimeout In addition, there's an existed flaky test from very early time: ClientTest.testLookupThrottling. Python tests are also broken. Because it must run after all C++ tests passed, they're also not exposed. 1. Some tests in `pulsar_test.py` might encounter `Timeout` error when creating producers or consumers. 2. Some tests in `schema_test.py` failed because some comparisons between two `ComplexRecord`s failed. Since the CI test of C++ client would never fail after #10309 (will be fixed by #11575), all PRs about C++ or Python client are not verified even if CI passed. Before #11575 is merged, we need to fix all existed bugs of C++ client. ### Modifications Corresponding to the above tests group, this PR adds following modifications: 1. Add the `?checkAllowAutoCreation=true` URL suffix to allow HTTP lookup to create topics automatically. 2. When iterating through a resolved IP list, increase the iterator first, then run the connection timer and try to connect the next IP. Regarding to the flaky `testLookupThrottling`, this PR adds a `client.close()` at the end of test and fix the `ClientImpl::close` implementation. Before this PR, if there're no producers or consumers in a client, the `close()` method wouldn't call `shutdown()` to close connection poll and executors. Only after the `Client` instance was destructed would the `shutdown()` method be called. In this case, this PR calls `handleClose` instead of invoking callback directly. In addition, chang [...] This PR also fixes the failed timeout Python tests, some are caused by incorrect import of classes, some are caused by `client` was not closed. Regarding to Python schema tests, in Python2, `self.__ne__(other)` is not equivalent to `not self.__eq__(other)` when the default `__eq__` implementation is overwritten. If a `Record` object has a field whose type is also `Record`, the `Record.__ne__` method will be called, see https://github.com/apache/pulsar/blob/ddb5fb0e062c2fe0967efce2a443a31f9cd12c07/pulsar-client-cpp/python/pulsar/schema/definition.py#L138-L139 but it just uses the default implementation to check whether they're not equal. The custom `__eq__` method won't be called. Therefore, this PR implement `Record.__ne__` explicitly to call `Record.__eq__` so that the comparison will work for Python2. ### Verifying this change We can only check the workflow output to verify this change. (cherry picked from commit 4919a82cef71232a174799415ed12c1c6155600a) --- pulsar-client-cpp/lib/ClientConnection.cc | 39 +++++++++++++++------- pulsar-client-cpp/lib/ClientImpl.cc | 2 +- pulsar-client-cpp/lib/ConnectionPool.cc | 2 +- pulsar-client-cpp/lib/HTTPLookupService.cc | 1 + .../python/pulsar/schema/definition.py | 3 ++ pulsar-client-cpp/python/pulsar_test.py | 34 ++++++++++++------- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 3 ++ pulsar-client-cpp/tests/CustomLoggerTest.cc | 4 +-- 8 files changed, 60 insertions(+), 28 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 758b67e..ac228b1 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -187,6 +187,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: consumerStatsRequestTimer_(executor_->createDeadlineTimer()), numOfPendingLookupRequest_(0), isTlsAllowInsecureConnection_(false) { + LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout()); if (clientConfiguration.isUseTls()) { #if BOOST_VERSION >= 105400 boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client); @@ -416,21 +417,28 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success)); } } else if (endpointIterator != tcp::resolver::iterator()) { + LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message()); // The connection failed. Try the next endpoint in the list. - boost::system::error_code err; - socket_->close(err); // ignore the error of close - if (err) { + boost::system::error_code closeError; + socket_->close(closeError); // ignore the error of close + if (closeError) { LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); } connectTimeoutTask_->stop(); - connectTimeoutTask_->start(); - tcp::endpoint endpoint = *endpointIterator; - socket_->async_connect(endpoint, std::bind(&ClientConnection::handleTcpConnected, shared_from_this(), - std::placeholders::_1, ++endpointIterator)); + ++endpointIterator; + if (endpointIterator != tcp::resolver::iterator()) { + LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); + connectTimeoutTask_->start(); + tcp::endpoint endpoint = *endpointIterator; + socket_->async_connect(endpoint, + std::bind(&ClientConnection::handleTcpConnected, shared_from_this(), + std::placeholders::_1, ++endpointIterator)); + } else { + close(); + } } else { LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); close(); - return; } } @@ -495,7 +503,7 @@ void ClientConnection::tcpConnectAsync() { return; } - LOG_DEBUG(cnxString_ << "Connecting to " << service_url.host() << ":" << service_url.port()); + LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port()); tcp::resolver::query query(service_url.host(), std::to_string(service_url.port())); resolver_->async_resolve(query, std::bind(&ClientConnection::handleResolve, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); @@ -514,12 +522,16 @@ void ClientConnection::handleResolve(const boost::system::error_code& err, if (state_ != TcpConnected) { LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs() << " ms, close the socket"); - PeriodicTask::ErrorCode ignoredError; - socket_->close(ignoredError); + PeriodicTask::ErrorCode err; + socket_->close(err); + if (err) { + LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); + } } connectTimeoutTask_->stop(); }); + LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); connectTimeoutTask_->start(); if (endpointIterator != tcp::resolver::iterator()) { LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() // @@ -1438,7 +1450,10 @@ void ClientConnection::close() { } if (tlsSocket_) { - tlsSocket_->lowest_layer().close(); + tlsSocket_->lowest_layer().close(err); + if (err) { + LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message()); + } } if (executor_) { diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 02099a5..feff610 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -507,7 +507,7 @@ void ClientImpl::closeAsync(CloseCallback callback) { } if (*numberOfOpenHandlers == 0 && callback) { - callback(ResultOk); + handleClose(ResultOk, numberOfOpenHandlers, callback); } } diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc index cc5668b..bb4f5c2 100644 --- a/pulsar-client-cpp/lib/ConnectionPool.cc +++ b/pulsar-client-cpp/lib/ConnectionPool.cc @@ -46,7 +46,7 @@ void ConnectionPool::close() { if (poolConnections_) { for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) { ClientConnectionPtr cnx = cnxIt->second.lock(); - if (cnx && !cnx->isClosed()) { + if (cnx) { cnx->close(); } } diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc index 72b7a44..e881ac4 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.cc +++ b/pulsar-client-cpp/lib/HTTPLookupService.cc @@ -106,6 +106,7 @@ Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync << '/' << PARTITION_METHOD_NAME; } + completeUrlStream << "?checkAllowAutoCreation=true"; executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), PartitionMetaData)); diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py index dcb2d83..41c094d 100644 --- a/pulsar-client-cpp/python/pulsar/schema/definition.py +++ b/pulsar-client-cpp/python/pulsar/schema/definition.py @@ -139,6 +139,9 @@ class Record(with_metaclass(RecordMeta, object)): return False return True + def __ne__(self, other): + return not self.__eq__(other) + def __str__(self): return str(self.__dict__) diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index fc17c72..5810745 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -23,6 +23,7 @@ import logging from unittest import TestCase, main import time import os +import pulsar import uuid from datetime import timedelta from pulsar import Client, MessageId, \ @@ -30,7 +31,7 @@ from pulsar import Client, MessageId, \ AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \ CryptoKeyReader -from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError +from _pulsar import ProducerConfiguration, ConsumerConfiguration from schema_test import * @@ -155,6 +156,7 @@ class PulsarTest(TestCase): consumer.acknowledge(msg) print('receive from {}'.format(msg.message_id())) self.assertEqual(msg_id, msg.message_id()) + client.close() def test_producer_consumer(self): client = Client(self.serviceUrl) @@ -292,7 +294,7 @@ class PulsarTest(TestCase): subscription_name='my-subscription', schema=pulsar.schema.StringSchema()) producer = client.create_producer(topic=topic, - schema=StringSchema()) + schema=pulsar.schema.StringSchema()) producer.send('hello', properties={ 'a': '1', @@ -319,10 +321,11 @@ class PulsarTest(TestCase): tls_allow_insecure_connection=False, authentication=AuthenticationTLS(certs_dir + 'client-cert.pem', certs_dir + 'client-key.pem')) - consumer = client.subscribe('my-python-topic-tls-auth', + topic = 'my-python-topic-tls-auth-' + str(time.time()) + consumer = client.subscribe(topic, 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('my-python-topic-tls-auth') + producer = client.create_producer(topic) producer.send(b'hello') msg = consumer.receive(TM) @@ -346,10 +349,11 @@ class PulsarTest(TestCase): tls_allow_insecure_connection=False, authentication=Authentication(authPlugin, authParams)) - consumer = client.subscribe('my-python-topic-tls-auth-2', + topic = 'my-python-topic-tls-auth-2-' + str(time.time()) + consumer = client.subscribe(topic, 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('my-python-topic-tls-auth-2') + producer = client.create_producer(topic) producer.send(b'hello') msg = consumer.receive(TM) @@ -392,10 +396,11 @@ class PulsarTest(TestCase): tls_allow_insecure_connection=False, authentication=Authentication(authPlugin, authParams)) - consumer = client.subscribe('my-python-topic-tls-auth-3', + topic = 'my-python-topic-tls-auth-3-' + str(time.time()) + consumer = client.subscribe(topic, 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('my-python-topic-tls-auth-3') + producer = client.create_producer(topic) producer.send(b'hello') msg = consumer.receive(TM) @@ -583,6 +588,8 @@ class PulsarTest(TestCase): producer.send(b'hello-%d' % i) self.assertEqual(producer.last_sequence_id(), i) + client.close() + doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication', 'false') @@ -630,6 +637,8 @@ class PulsarTest(TestCase): with self.assertRaises(pulsar.Timeout): consumer.receive(100) + client.close() + doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication', 'false') @@ -820,10 +829,11 @@ class PulsarTest(TestCase): def test_seek(self): client = Client(self.serviceUrl) - consumer = client.subscribe('my-python-topic-seek', + topic = 'my-python-topic-seek-' + str(time.time()) + consumer = client.subscribe(topic, 'my-sub', consumer_type=ConsumerType.Shared) - producer = client.create_producer('my-python-topic-seek') + producer = client.create_producer(topic) for i in range(100): if i > 0: @@ -858,7 +868,7 @@ class PulsarTest(TestCase): self.assertEqual(msg.data(), b'hello-42') # repeat with reader - reader = client.create_reader('my-python-topic-seek', MessageId.latest) + reader = client.create_reader(topic, MessageId.latest) with self.assertRaises(pulsar.Timeout): reader.read_next(100) @@ -1157,7 +1167,7 @@ class PulsarTest(TestCase): try: producer = client.create_producer('test_connect_timeout') self.fail('create_producer should not succeed') - except ConnectError as expected: + except pulsar.ConnectError as expected: print('expected error: {} when create producer'.format(expected)) t2 = time.time() self.assertGreater(t2 - t1, 1.0) diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 2f8524e..8e7eb6b 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -298,6 +298,7 @@ TEST(BasicEndToEndTest, testLookupThrottling) { std::string topicName = "testLookupThrottling"; ClientConfiguration config; config.setConcurrentLookupRequest(0); + config.setLogger(new ConsoleLoggerFactory(Logger::LEVEL_DEBUG)); Client client(lookupUrl, config); Producer producer; @@ -307,6 +308,8 @@ TEST(BasicEndToEndTest, testLookupThrottling) { Consumer consumer1; result = client.subscribe(topicName, "my-sub-name", consumer1); ASSERT_EQ(ResultTooManyLookupRequestException, result); + + client.close(); } TEST(BasicEndToEndTest, testNonExistingTopic) { diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc b/pulsar-client-cpp/tests/CustomLoggerTest.cc index f2a97d1..ec83e42 100644 --- a/pulsar-client-cpp/tests/CustomLoggerTest.cc +++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc @@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) { // reset to previous log factory Client client("pulsar://localhost:6650", clientConfig); client.close(); - ASSERT_EQ(logLines.size(), 2); + ASSERT_EQ(logLines.size(), 3); LogUtils::resetLoggerFactory(); }); testThread.join(); @@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) { Client client("pulsar://localhost:6650", clientConfig); client.close(); // custom logger didn't get any new lines - ASSERT_EQ(logLines.size(), 2); + ASSERT_EQ(logLines.size(), 3); } TEST(CustomLoggerTest, testConsoleLoggerFactory) {
