This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4919a82 [C++/Python] Fix bugs that were not exposed by broken C++ CI
before (#11557)
4919a82 is described below
commit 4919a82cef71232a174799415ed12c1c6155600a
Author: Yunze Xu <[email protected]>
AuthorDate: Sun Aug 8 23:17:58 2021 +0800
[C++/Python] Fix bugs that were not exposed by broken C++ CI before (#11557)
Fixes #11551
### Motivation
Currently there're some bugs of C++ client and some tests cannot pass:
1. Introduced from #10601 because it changed the behavior of the admin API
to get partition metadata while the C++ implementation relies on the original
behavior to create topics automatically. So any test that uses HTTP lookup will
fail.
- AuthPluginTest.testTlsDetectHttps
- AuthPluginToken.testTokenWithHttpUrl
- BasicEndToEndTest.testHandlerReconnectionLogic
- BasicEndToEndTest.testV2TopicHttp
- ClientDeduplicationTest.testProducerDeduplication
2. Introduced from #11029 and #11486 , the implementation will iterate more
than once even there's only one valid resolved IP address.
- ClientTest.testConnectTimeout
In addition, there's an existed flaky test from very early time:
ClientTest.testLookupThrottling.
Python tests are also broken. Because it must run after all C++ tests
passed, they're also not exposed.
1. Some tests in `pulsar_test.py` might encounter `Timeout` error when
creating producers or consumers.
2. Some tests in `schema_test.py` failed because some comparisons between
two `ComplexRecord`s failed.
Since the CI test of C++ client would never fail after #10309 (will be
fixed by #11575), all PRs about C++ or Python client are not verified even if
CI passed. Before #11575 is merged, we need to fix all existed bugs of C++
client.
### Modifications
Corresponding to the above tests group, this PR adds following
modifications:
1. Add the `?checkAllowAutoCreation=true` URL suffix to allow HTTP lookup
to create topics automatically.
2. When iterating through a resolved IP list, increase the iterator first,
then run the connection timer and try to connect the next IP.
Regarding to the flaky `testLookupThrottling`, this PR adds a
`client.close()` at the end of test and fix the `ClientImpl::close`
implementation. Before this PR, if there're no producers or consumers in a
client, the `close()` method wouldn't call `shutdown()` to close connection
poll and executors. Only after the `Client` instance was destructed would the
`shutdown()` method be called. In this case, this PR calls `handleClose`
instead of invoking callback directly. In addition, chang [...]
This PR also fixes the failed timeout Python tests, some are caused by
incorrect import of classes, some are caused by `client` was not closed.
Regarding to Python schema tests, in Python2, `self.__ne__(other)` is not
equivalent to `not self.__eq__(other)` when the default `__eq__` implementation
is overwritten. If a `Record` object has a field whose type is also `Record`,
the `Record.__ne__` method will be called, see
https://github.com/apache/pulsar/blob/ddb5fb0e062c2fe0967efce2a443a31f9cd12c07/pulsar-client-cpp/python/pulsar/schema/definition.py#L138-L139
but it just uses the default implementation to check whether they're not
equal. The custom `__eq__` method won't be called. Therefore, this PR implement
`Record.__ne__` explicitly to call `Record.__eq__` so that the comparison will
work for Python2.
### Verifying this change
We can only check the workflow output to verify this change.
---
pulsar-client-cpp/lib/ClientConnection.cc | 39 +++++++++++++++-------
pulsar-client-cpp/lib/ClientImpl.cc | 2 +-
pulsar-client-cpp/lib/ConnectionPool.cc | 2 +-
pulsar-client-cpp/lib/HTTPLookupService.cc | 1 +
.../python/pulsar/schema/definition.py | 3 ++
pulsar-client-cpp/python/pulsar_test.py | 34 ++++++++++++-------
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 3 ++
pulsar-client-cpp/tests/CustomLoggerTest.cc | 4 +--
8 files changed, 60 insertions(+), 28 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc
b/pulsar-client-cpp/lib/ClientConnection.cc
index 3471c75..47ab4f7 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -187,6 +187,7 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
numOfPendingLookupRequest_(0),
isTlsAllowInsecureConnection_(false) {
+ LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" <<
clientConfiguration.getConnectionTimeout());
if (clientConfiguration.isUseTls()) {
#if BOOST_VERSION >= 105400
boost::asio::ssl::context
ctx(boost::asio::ssl::context::tlsv12_client);
@@ -433,21 +434,28 @@ void ClientConnection::handleTcpConnected(const
boost::system::error_code& err,
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
}
} else if (endpointIterator != tcp::resolver::iterator()) {
+ LOG_WARN(cnxString_ << "Failed to establish connection: " <<
err.message());
// The connection failed. Try the next endpoint in the list.
- boost::system::error_code err;
- socket_->close(err); // ignore the error of close
- if (err) {
+ boost::system::error_code closeError;
+ socket_->close(closeError); // ignore the error of close
+ if (closeError) {
LOG_WARN(cnxString_ << "Failed to close socket: " <<
err.message());
}
connectTimeoutTask_->stop();
- connectTimeoutTask_->start();
- tcp::endpoint endpoint = *endpointIterator;
- socket_->async_connect(endpoint,
std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
- std::placeholders::_1,
++endpointIterator));
+ ++endpointIterator;
+ if (endpointIterator != tcp::resolver::iterator()) {
+ LOG_DEBUG(cnxString_ << "Connecting to " <<
endpointIterator->endpoint() << "...");
+ connectTimeoutTask_->start();
+ tcp::endpoint endpoint = *endpointIterator;
+ socket_->async_connect(endpoint,
+
std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
+ std::placeholders::_1,
++endpointIterator));
+ } else {
+ close();
+ }
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " <<
err.message());
close();
- return;
}
}
@@ -512,7 +520,7 @@ void ClientConnection::tcpConnectAsync() {
return;
}
- LOG_DEBUG(cnxString_ << "Connecting to " << service_url.host() << ":" <<
service_url.port());
+ LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" <<
service_url.port());
tcp::resolver::query query(service_url.host(),
std::to_string(service_url.port()));
resolver_->async_resolve(query,
std::bind(&ClientConnection::handleResolve, shared_from_this(),
std::placeholders::_1,
std::placeholders::_2));
@@ -531,12 +539,16 @@ void ClientConnection::handleResolve(const
boost::system::error_code& err,
if (state_ != TcpConnected) {
LOG_ERROR(cnxString_ << "Connection was not established in " <<
connectTimeoutTask_->getPeriodMs()
<< " ms, close the socket");
- PeriodicTask::ErrorCode ignoredError;
- socket_->close(ignoredError);
+ PeriodicTask::ErrorCode err;
+ socket_->close(err);
+ if (err) {
+ LOG_WARN(cnxString_ << "Failed to close socket: " <<
err.message());
+ }
}
connectTimeoutTask_->stop();
});
+ LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint()
<< "...");
connectTimeoutTask_->start();
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Resolved hostname " <<
endpointIterator->host_name() //
@@ -1455,7 +1467,10 @@ void ClientConnection::close() {
}
if (tlsSocket_) {
- tlsSocket_->lowest_layer().close();
+ tlsSocket_->lowest_layer().close(err);
+ if (err) {
+ LOG_WARN(cnxString_ << "Failed to close TLS socket: " <<
err.message());
+ }
}
if (executor_) {
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc
b/pulsar-client-cpp/lib/ClientImpl.cc
index d6663e8..e051d76 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -514,7 +514,7 @@ void ClientImpl::closeAsync(CloseCallback callback) {
}
if (*numberOfOpenHandlers == 0 && callback) {
- callback(ResultOk);
+ handleClose(ResultOk, numberOfOpenHandlers, callback);
}
}
diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc
b/pulsar-client-cpp/lib/ConnectionPool.cc
index cc5668b..bb4f5c2 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.cc
+++ b/pulsar-client-cpp/lib/ConnectionPool.cc
@@ -46,7 +46,7 @@ void ConnectionPool::close() {
if (poolConnections_) {
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
ClientConnectionPtr cnx = cnxIt->second.lock();
- if (cnx && !cnx->isClosed()) {
+ if (cnx) {
cnx->close();
}
}
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 72b7a44..e881ac4 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -106,6 +106,7 @@ Future<Result, LookupDataResultPtr>
HTTPLookupService::getPartitionMetadataAsync
<< '/' << PARTITION_METHOD_NAME;
}
+ completeUrlStream << "?checkAllowAutoCreation=true";
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest,
shared_from_this(), promise,
completeUrlStream.str(),
PartitionMetaData));
diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py
b/pulsar-client-cpp/python/pulsar/schema/definition.py
index dcb2d83..41c094d 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -139,6 +139,9 @@ class Record(with_metaclass(RecordMeta, object)):
return False
return True
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
def __str__(self):
return str(self.__dict__)
diff --git a/pulsar-client-cpp/python/pulsar_test.py
b/pulsar-client-cpp/python/pulsar_test.py
index fc17c72..5810745 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -23,6 +23,7 @@ import logging
from unittest import TestCase, main
import time
import os
+import pulsar
import uuid
from datetime import timedelta
from pulsar import Client, MessageId, \
@@ -30,7 +31,7 @@ from pulsar import Client, MessageId, \
AuthenticationTLS, Authentication, AuthenticationToken,
InitialPosition, \
CryptoKeyReader
-from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError
+from _pulsar import ProducerConfiguration, ConsumerConfiguration
from schema_test import *
@@ -155,6 +156,7 @@ class PulsarTest(TestCase):
consumer.acknowledge(msg)
print('receive from {}'.format(msg.message_id()))
self.assertEqual(msg_id, msg.message_id())
+ client.close()
def test_producer_consumer(self):
client = Client(self.serviceUrl)
@@ -292,7 +294,7 @@ class PulsarTest(TestCase):
subscription_name='my-subscription',
schema=pulsar.schema.StringSchema())
producer = client.create_producer(topic=topic,
- schema=StringSchema())
+ schema=pulsar.schema.StringSchema())
producer.send('hello',
properties={
'a': '1',
@@ -319,10 +321,11 @@ class PulsarTest(TestCase):
tls_allow_insecure_connection=False,
authentication=AuthenticationTLS(certs_dir +
'client-cert.pem', certs_dir + 'client-key.pem'))
- consumer = client.subscribe('my-python-topic-tls-auth',
+ topic = 'my-python-topic-tls-auth-' + str(time.time())
+ consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
- producer = client.create_producer('my-python-topic-tls-auth')
+ producer = client.create_producer(topic)
producer.send(b'hello')
msg = consumer.receive(TM)
@@ -346,10 +349,11 @@ class PulsarTest(TestCase):
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams))
- consumer = client.subscribe('my-python-topic-tls-auth-2',
+ topic = 'my-python-topic-tls-auth-2-' + str(time.time())
+ consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
- producer = client.create_producer('my-python-topic-tls-auth-2')
+ producer = client.create_producer(topic)
producer.send(b'hello')
msg = consumer.receive(TM)
@@ -392,10 +396,11 @@ class PulsarTest(TestCase):
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams))
- consumer = client.subscribe('my-python-topic-tls-auth-3',
+ topic = 'my-python-topic-tls-auth-3-' + str(time.time())
+ consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
- producer = client.create_producer('my-python-topic-tls-auth-3')
+ producer = client.create_producer(topic)
producer.send(b'hello')
msg = consumer.receive(TM)
@@ -583,6 +588,8 @@ class PulsarTest(TestCase):
producer.send(b'hello-%d' % i)
self.assertEqual(producer.last_sequence_id(), i)
+ client.close()
+
doHttpPost(self.adminUrl +
'/admin/v2/namespaces/public/default/deduplication',
'false')
@@ -630,6 +637,8 @@ class PulsarTest(TestCase):
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)
+ client.close()
+
doHttpPost(self.adminUrl +
'/admin/v2/namespaces/public/default/deduplication',
'false')
@@ -820,10 +829,11 @@ class PulsarTest(TestCase):
def test_seek(self):
client = Client(self.serviceUrl)
- consumer = client.subscribe('my-python-topic-seek',
+ topic = 'my-python-topic-seek-' + str(time.time())
+ consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
- producer = client.create_producer('my-python-topic-seek')
+ producer = client.create_producer(topic)
for i in range(100):
if i > 0:
@@ -858,7 +868,7 @@ class PulsarTest(TestCase):
self.assertEqual(msg.data(), b'hello-42')
# repeat with reader
- reader = client.create_reader('my-python-topic-seek', MessageId.latest)
+ reader = client.create_reader(topic, MessageId.latest)
with self.assertRaises(pulsar.Timeout):
reader.read_next(100)
@@ -1157,7 +1167,7 @@ class PulsarTest(TestCase):
try:
producer = client.create_producer('test_connect_timeout')
self.fail('create_producer should not succeed')
- except ConnectError as expected:
+ except pulsar.ConnectError as expected:
print('expected error: {} when create producer'.format(expected))
t2 = time.time()
self.assertGreater(t2 - t1, 1.0)
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 2f8524e..8e7eb6b 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -298,6 +298,7 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
std::string topicName = "testLookupThrottling";
ClientConfiguration config;
config.setConcurrentLookupRequest(0);
+ config.setLogger(new ConsoleLoggerFactory(Logger::LEVEL_DEBUG));
Client client(lookupUrl, config);
Producer producer;
@@ -307,6 +308,8 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
Consumer consumer1;
result = client.subscribe(topicName, "my-sub-name", consumer1);
ASSERT_EQ(ResultTooManyLookupRequestException, result);
+
+ client.close();
}
TEST(BasicEndToEndTest, testNonExistingTopic) {
diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc
b/pulsar-client-cpp/tests/CustomLoggerTest.cc
index f2a97d1..ec83e42 100644
--- a/pulsar-client-cpp/tests/CustomLoggerTest.cc
+++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc
@@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
// reset to previous log factory
Client client("pulsar://localhost:6650", clientConfig);
client.close();
- ASSERT_EQ(logLines.size(), 2);
+ ASSERT_EQ(logLines.size(), 3);
LogUtils::resetLoggerFactory();
});
testThread.join();
@@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
Client client("pulsar://localhost:6650", clientConfig);
client.close();
// custom logger didn't get any new lines
- ASSERT_EQ(logLines.size(), 2);
+ ASSERT_EQ(logLines.size(), 3);
}
TEST(CustomLoggerTest, testConsoleLoggerFactory) {