This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4befb41c0933cbb5d41dd354927755e00ae343c6
Author: Yunze Xu <[email protected]>
AuthorDate: Sun Aug 8 23:17:58 2021 +0800

    [C++/Python] Fix bugs that were not exposed by broken C++ CI before (#11557)
    
    Fixes #11551
    
    ### Motivation
    
    Currently there're some bugs of C++ client and some tests cannot pass:
    
    1. Introduced from #10601 because it changed the behavior of the admin API 
to get partition metadata while the C++ implementation relies on the original 
behavior to create topics automatically. So any test that uses HTTP lookup will 
fail.
        - AuthPluginTest.testTlsDetectHttps
        - AuthPluginToken.testTokenWithHttpUrl
        - BasicEndToEndTest.testHandlerReconnectionLogic
        - BasicEndToEndTest.testV2TopicHttp
        - ClientDeduplicationTest.testProducerDeduplication
    2. Introduced from #11029 and #11486 , the implementation will iterate more 
than once even there's only one valid resolved IP address.
        - ClientTest.testConnectTimeout
    
    In addition, there's an existed flaky test from very early time: 
ClientTest.testLookupThrottling.
    
    Python tests are also broken. Because it must run after all C++ tests 
passed, they're also not exposed.
    1. Some tests in `pulsar_test.py` might encounter `Timeout` error when 
creating producers or consumers.
    2. Some tests in `schema_test.py` failed because some comparisons between 
two `ComplexRecord`s failed.
    
    Since the CI test of C++ client would never fail after #10309 (will be 
fixed by #11575), all PRs about C++ or Python client are not verified even if 
CI passed. Before #11575 is merged, we need to fix all existed bugs of C++ 
client.
    
    ### Modifications
    
    Corresponding to the above tests group, this PR adds following 
modifications:
    1. Add the `?checkAllowAutoCreation=true` URL suffix to allow HTTP lookup 
to create topics automatically.
    2. When iterating through a resolved IP list, increase the iterator first, 
then run the connection timer and try to connect the next IP.
    
    Regarding to the flaky `testLookupThrottling`, this PR adds a 
`client.close()` at the end of test and fix the `ClientImpl::close` 
implementation. Before this PR, if there're no producers or consumers in a 
client, the `close()` method wouldn't call `shutdown()` to close connection 
poll and executors. Only after the `Client` instance was destructed would the 
`shutdown()` method be called. In this case, this PR calls `handleClose` 
instead of invoking callback directly. In addition, chang [...]
    
    This PR also fixes the failed timeout Python tests, some are caused by 
incorrect import of classes, some are caused by `client` was not closed.
    
    Regarding to Python schema tests, in Python2, `self.__ne__(other)` is not 
equivalent to `not self.__eq__(other)` when the default `__eq__` implementation 
is overwritten. If a `Record` object has a field whose type is also `Record`, 
the `Record.__ne__` method will be called, see
    
    
https://github.com/apache/pulsar/blob/ddb5fb0e062c2fe0967efce2a443a31f9cd12c07/pulsar-client-cpp/python/pulsar/schema/definition.py#L138-L139
    
    but it just uses the default implementation to check whether they're not 
equal. The custom `__eq__` method won't be called. Therefore, this PR implement 
`Record.__ne__` explicitly to call `Record.__eq__` so that the comparison will 
work for Python2.
    
    ### Verifying this change
    
    We can only check the workflow output to verify this change.
    
    (cherry picked from commit 4919a82cef71232a174799415ed12c1c6155600a)
---
 pulsar-client-cpp/lib/ClientConnection.cc          | 39 +++++++++++++++-------
 pulsar-client-cpp/lib/ClientImpl.cc                |  2 +-
 pulsar-client-cpp/lib/ConnectionPool.cc            |  2 +-
 pulsar-client-cpp/lib/HTTPLookupService.cc         |  1 +
 .../python/pulsar/schema/definition.py             |  3 ++
 pulsar-client-cpp/python/pulsar_test.py            | 34 ++++++++++++-------
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |  3 ++
 pulsar-client-cpp/tests/CustomLoggerTest.cc        |  4 +--
 8 files changed, 60 insertions(+), 28 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index 758b67e..ac228b1 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -187,6 +187,7 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
       consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
       numOfPendingLookupRequest_(0),
       isTlsAllowInsecureConnection_(false) {
+    LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << 
clientConfiguration.getConnectionTimeout());
     if (clientConfiguration.isUseTls()) {
 #if BOOST_VERSION >= 105400
         boost::asio::ssl::context 
ctx(boost::asio::ssl::context::tlsv12_client);
@@ -416,21 +417,28 @@ void ClientConnection::handleTcpConnected(const 
boost::system::error_code& err,
             
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
         }
     } else if (endpointIterator != tcp::resolver::iterator()) {
+        LOG_WARN(cnxString_ << "Failed to establish connection: " << 
err.message());
         // The connection failed. Try the next endpoint in the list.
-        boost::system::error_code err;
-        socket_->close(err);  // ignore the error of close
-        if (err) {
+        boost::system::error_code closeError;
+        socket_->close(closeError);  // ignore the error of close
+        if (closeError) {
             LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
         }
         connectTimeoutTask_->stop();
-        connectTimeoutTask_->start();
-        tcp::endpoint endpoint = *endpointIterator;
-        socket_->async_connect(endpoint, 
std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
-                                                   std::placeholders::_1, 
++endpointIterator));
+        ++endpointIterator;
+        if (endpointIterator != tcp::resolver::iterator()) {
+            LOG_DEBUG(cnxString_ << "Connecting to " << 
endpointIterator->endpoint() << "...");
+            connectTimeoutTask_->start();
+            tcp::endpoint endpoint = *endpointIterator;
+            socket_->async_connect(endpoint,
+                                   
std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
+                                             std::placeholders::_1, 
++endpointIterator));
+        } else {
+            close();
+        }
     } else {
         LOG_ERROR(cnxString_ << "Failed to establish connection: " << 
err.message());
         close();
-        return;
     }
 }
 
@@ -495,7 +503,7 @@ void ClientConnection::tcpConnectAsync() {
         return;
     }
 
-    LOG_DEBUG(cnxString_ << "Connecting to " << service_url.host() << ":" << 
service_url.port());
+    LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << 
service_url.port());
     tcp::resolver::query query(service_url.host(), 
std::to_string(service_url.port()));
     resolver_->async_resolve(query, 
std::bind(&ClientConnection::handleResolve, shared_from_this(),
                                               std::placeholders::_1, 
std::placeholders::_2));
@@ -514,12 +522,16 @@ void ClientConnection::handleResolve(const 
boost::system::error_code& err,
         if (state_ != TcpConnected) {
             LOG_ERROR(cnxString_ << "Connection was not established in " << 
connectTimeoutTask_->getPeriodMs()
                                  << " ms, close the socket");
-            PeriodicTask::ErrorCode ignoredError;
-            socket_->close(ignoredError);
+            PeriodicTask::ErrorCode err;
+            socket_->close(err);
+            if (err) {
+                LOG_WARN(cnxString_ << "Failed to close socket: " << 
err.message());
+            }
         }
         connectTimeoutTask_->stop();
     });
 
+    LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() 
<< "...");
     connectTimeoutTask_->start();
     if (endpointIterator != tcp::resolver::iterator()) {
         LOG_DEBUG(cnxString_ << "Resolved hostname " << 
endpointIterator->host_name()  //
@@ -1438,7 +1450,10 @@ void ClientConnection::close() {
     }
 
     if (tlsSocket_) {
-        tlsSocket_->lowest_layer().close();
+        tlsSocket_->lowest_layer().close(err);
+        if (err) {
+            LOG_WARN(cnxString_ << "Failed to close TLS socket: " << 
err.message());
+        }
     }
 
     if (executor_) {
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc 
b/pulsar-client-cpp/lib/ClientImpl.cc
index 02099a5..feff610 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -507,7 +507,7 @@ void ClientImpl::closeAsync(CloseCallback callback) {
     }
 
     if (*numberOfOpenHandlers == 0 && callback) {
-        callback(ResultOk);
+        handleClose(ResultOk, numberOfOpenHandlers, callback);
     }
 }
 
diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc 
b/pulsar-client-cpp/lib/ConnectionPool.cc
index cc5668b..bb4f5c2 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.cc
+++ b/pulsar-client-cpp/lib/ConnectionPool.cc
@@ -46,7 +46,7 @@ void ConnectionPool::close() {
     if (poolConnections_) {
         for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
             ClientConnectionPtr cnx = cnxIt->second.lock();
-            if (cnx && !cnx->isClosed()) {
+            if (cnx) {
                 cnx->close();
             }
         }
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc 
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 72b7a44..e881ac4 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -106,6 +106,7 @@ Future<Result, LookupDataResultPtr> 
HTTPLookupService::getPartitionMetadataAsync
                           << '/' << PARTITION_METHOD_NAME;
     }
 
+    completeUrlStream << "?checkAllowAutoCreation=true";
     
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest,
                                                  shared_from_this(), promise, 
completeUrlStream.str(),
                                                  PartitionMetaData));
diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py 
b/pulsar-client-cpp/python/pulsar/schema/definition.py
index dcb2d83..41c094d 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -139,6 +139,9 @@ class Record(with_metaclass(RecordMeta, object)):
                 return False
         return True
 
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
     def __str__(self):
         return str(self.__dict__)
 
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index fc17c72..5810745 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -23,6 +23,7 @@ import logging
 from unittest import TestCase, main
 import time
 import os
+import pulsar
 import uuid
 from datetime import timedelta
 from pulsar import Client, MessageId, \
@@ -30,7 +31,7 @@ from pulsar import Client, MessageId, \
             AuthenticationTLS, Authentication, AuthenticationToken, 
InitialPosition, \
             CryptoKeyReader
 
-from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError
+from _pulsar import ProducerConfiguration, ConsumerConfiguration
 
 from schema_test import *
 
@@ -155,6 +156,7 @@ class PulsarTest(TestCase):
         consumer.acknowledge(msg)
         print('receive from {}'.format(msg.message_id()))
         self.assertEqual(msg_id, msg.message_id())
+        client.close()
 
     def test_producer_consumer(self):
         client = Client(self.serviceUrl)
@@ -292,7 +294,7 @@ class PulsarTest(TestCase):
                                     subscription_name='my-subscription',
                                     schema=pulsar.schema.StringSchema())
         producer = client.create_producer(topic=topic,
-                                          schema=StringSchema())
+                                          schema=pulsar.schema.StringSchema())
         producer.send('hello',
                       properties={
                           'a': '1',
@@ -319,10 +321,11 @@ class PulsarTest(TestCase):
                         tls_allow_insecure_connection=False,
                         authentication=AuthenticationTLS(certs_dir + 
'client-cert.pem', certs_dir + 'client-key.pem'))
 
-        consumer = client.subscribe('my-python-topic-tls-auth',
+        topic = 'my-python-topic-tls-auth-' + str(time.time())
+        consumer = client.subscribe(topic,
                                     'my-sub',
                                     consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-python-topic-tls-auth')
+        producer = client.create_producer(topic)
         producer.send(b'hello')
 
         msg = consumer.receive(TM)
@@ -346,10 +349,11 @@ class PulsarTest(TestCase):
                         tls_allow_insecure_connection=False,
                         authentication=Authentication(authPlugin, authParams))
 
-        consumer = client.subscribe('my-python-topic-tls-auth-2',
+        topic = 'my-python-topic-tls-auth-2-' + str(time.time())
+        consumer = client.subscribe(topic,
                                     'my-sub',
                                     consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-python-topic-tls-auth-2')
+        producer = client.create_producer(topic)
         producer.send(b'hello')
 
         msg = consumer.receive(TM)
@@ -392,10 +396,11 @@ class PulsarTest(TestCase):
                         tls_allow_insecure_connection=False,
                         authentication=Authentication(authPlugin, authParams))
 
-        consumer = client.subscribe('my-python-topic-tls-auth-3',
+        topic = 'my-python-topic-tls-auth-3-' + str(time.time())
+        consumer = client.subscribe(topic,
                                     'my-sub',
                                     consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-python-topic-tls-auth-3')
+        producer = client.create_producer(topic)
         producer.send(b'hello')
 
         msg = consumer.receive(TM)
@@ -583,6 +588,8 @@ class PulsarTest(TestCase):
             producer.send(b'hello-%d' % i)
             self.assertEqual(producer.last_sequence_id(), i)
 
+        client.close()
+
         doHttpPost(self.adminUrl + 
'/admin/v2/namespaces/public/default/deduplication',
                    'false')
 
@@ -630,6 +637,8 @@ class PulsarTest(TestCase):
         with self.assertRaises(pulsar.Timeout):
             consumer.receive(100)
 
+        client.close()
+
         doHttpPost(self.adminUrl + 
'/admin/v2/namespaces/public/default/deduplication',
                    'false')
 
@@ -820,10 +829,11 @@ class PulsarTest(TestCase):
 
     def test_seek(self):
         client = Client(self.serviceUrl)
-        consumer = client.subscribe('my-python-topic-seek',
+        topic = 'my-python-topic-seek-' + str(time.time())
+        consumer = client.subscribe(topic,
                                     'my-sub',
                                     consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-python-topic-seek')
+        producer = client.create_producer(topic)
 
         for i in range(100):
             if i > 0:
@@ -858,7 +868,7 @@ class PulsarTest(TestCase):
         self.assertEqual(msg.data(), b'hello-42')
 
         # repeat with reader
-        reader = client.create_reader('my-python-topic-seek', MessageId.latest)
+        reader = client.create_reader(topic, MessageId.latest)
         with self.assertRaises(pulsar.Timeout):
             reader.read_next(100)
 
@@ -1157,7 +1167,7 @@ class PulsarTest(TestCase):
         try:
             producer = client.create_producer('test_connect_timeout')
             self.fail('create_producer should not succeed')
-        except ConnectError as expected:
+        except pulsar.ConnectError as expected:
             print('expected error: {} when create producer'.format(expected))
         t2 = time.time()
         self.assertGreater(t2 - t1, 1.0)
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 2f8524e..8e7eb6b 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -298,6 +298,7 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
     std::string topicName = "testLookupThrottling";
     ClientConfiguration config;
     config.setConcurrentLookupRequest(0);
+    config.setLogger(new ConsoleLoggerFactory(Logger::LEVEL_DEBUG));
     Client client(lookupUrl, config);
 
     Producer producer;
@@ -307,6 +308,8 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
     Consumer consumer1;
     result = client.subscribe(topicName, "my-sub-name", consumer1);
     ASSERT_EQ(ResultTooManyLookupRequestException, result);
+
+    client.close();
 }
 
 TEST(BasicEndToEndTest, testNonExistingTopic) {
diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc 
b/pulsar-client-cpp/tests/CustomLoggerTest.cc
index f2a97d1..ec83e42 100644
--- a/pulsar-client-cpp/tests/CustomLoggerTest.cc
+++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc
@@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
         // reset to previous log factory
         Client client("pulsar://localhost:6650", clientConfig);
         client.close();
-        ASSERT_EQ(logLines.size(), 2);
+        ASSERT_EQ(logLines.size(), 3);
         LogUtils::resetLoggerFactory();
     });
     testThread.join();
@@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
     Client client("pulsar://localhost:6650", clientConfig);
     client.close();
     // custom logger didn't get any new lines
-    ASSERT_EQ(logLines.size(), 2);
+    ASSERT_EQ(logLines.size(), 3);
 }
 
 TEST(CustomLoggerTest, testConsoleLoggerFactory) {

Reply via email to