This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new bc5f430 [Issue 11632][C++] Turning on more compiler warnings, and
enforcing warnings as errors (#11668)
bc5f430 is described below
commit bc5f430695034e3c2779328f1cea0aca5486ae5d
Author: Brett <[email protected]>
AuthorDate: Mon Aug 23 19:52:04 2021 -0500
[Issue 11632][C++] Turning on more compiler warnings, and enforcing
warnings as errors (#11668)
* [C++] Turning on more compiler warning flags, and enforcing warnings as
errors.
This change enables several key warning flags that prevent common mistakes
in C++: -Wall -Wformat-security and -Wvla, as well as ensuring the code won't
build if it contains warnings. This will help to keep the code base clean and
stable long term. I was also planning to enable "-Wextra" as it contains
several helpful warnings, but I thought the changes required to get -Wall
enabled were getting a bit huge as-is, so I'm going to split that effort into
two PRs.
Most of the changes fall into four categories:
* The vast majority are fixing class member initialization order warnings.
These can lead to real bugs and are important to fix.
* Next was unused variables or functions - these were mostly found in the
tests
* Functions with switches on enum values and no default/fallback case
resulting in a code path with no return - just needed exception throw
statements.
* Finally, I also fixed several misuses of the "static" keyword: when
applied to global variables or functions, this actually means that the
identifier has "internal linkage", meaning it's not accessible outside the
current translation unit. Putting this in a header is almost never what you
want to do, but it's a common mistake since the meaning is different when
applied to class members. The "inline" keyword is a better choice in these
circumstances.
Tests are still all passing.
[C++] Removing some unnecessary variable type changes I meant to remove
from the previous commit. I decided to disable the "signed comparison warning"
since there are typically tons of these and they're tricky to fix because you
have to go through and change all of your integer types to get them to line up
right. Most C++ code bases I've worked on in the past also disable this
warning. It's too much pain for too little gain.
* Fixed more warnings and issues arising from the Release mode build.
I had forgotten to build in release the first time, and it revealed more
warnings that needed to be addressed. Finally, I formatted everything with
clang-format.
* [C++] Compiling under clang with -Wall generates a slew of additional
warnings, a few of which were quite serious. This commit fixes these.
I also reformatted the CMakeLists file to use the newer options syntax and
better support clang and GCC side by side. This seems to work with recent GCC
and clang versions. Will need to further test with older compilers as well.
* Improved support for running tests outside the Docker environment, by
allowing a locally installed gtest-parallel to be detected.
* Fixed two more unit tests
* Tweaking the compiler settings to work under the older compiler versions
used in CentOS7
* Applying clang-format
* Forgot to actually fix the real unused variable warnings in this file
when CRC32 is disabled
* Realized that I was running "check-format" with a version of clang-format
that was too new, and it was formatting files differently. Probably should add
a better check for that at some point, but this change is already getting out
of control.
* Apparently my attempts to allow gtest-parallel on a non-root path cause
problems with the regular build. Reverting them. Also fixing potential
mis-use of tar command in the test service start script.
Co-authored-by: Matteo Merli <[email protected]>
(cherry picked from commit 4e60de6b4356f5727792484ed5e98dc1d454e839)
Solved conflicts by modifing following files:
- pulsar-client-cpp/lib/Producer.cc
- pulsar-client-cpp/lib/LogUtils.h
- pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
- pulsar-client-cpp/tests/CustomLoggerTest.cc
---
pulsar-client-cpp/CMakeLists.txt | 29 +++++++++++++++++-----
pulsar-client-cpp/examples/SampleConsumerCApi.c | 2 +-
.../examples/SampleConsumerListenerCApi.c | 2 +-
pulsar-client-cpp/examples/SampleReaderCApi.c | 2 +-
.../include/pulsar/BrokerConsumerStats.h | 3 ++-
.../include/pulsar/ClientConfiguration.h | 2 +-
pulsar-client-cpp/lib/Backoff.cc | 9 +++----
pulsar-client-cpp/lib/Backoff.h | 6 ++---
pulsar-client-cpp/lib/BinaryProtoLookupService.cc | 8 ++----
pulsar-client-cpp/lib/BinaryProtoLookupService.h | 4 +--
pulsar-client-cpp/lib/BrokerConsumerStats.cc | 2 --
.../lib/BrokerConsumerStatsImplBase.h | 1 +
pulsar-client-cpp/lib/ClientConfiguration.cc | 4 ++-
pulsar-client-cpp/lib/ClientConnection.cc | 15 +++--------
pulsar-client-cpp/lib/ClientConnection.h | 22 ++++++++--------
pulsar-client-cpp/lib/Commands.cc | 6 ++---
pulsar-client-cpp/lib/CompressionCodec.cc | 3 +++
pulsar-client-cpp/lib/ConsumerImpl.cc | 10 +++-----
pulsar-client-cpp/lib/ConsumerImpl.h | 2 +-
pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc | 8 ++----
pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h | 4 +--
pulsar-client-cpp/lib/HTTPLookupService.cc | 2 +-
pulsar-client-cpp/lib/HTTPLookupService.h | 7 ++++--
pulsar-client-cpp/lib/MessageCrypto.cc | 3 +--
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 11 +++-----
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 6 ++---
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 4 ---
pulsar-client-cpp/lib/PartitionedConsumerImpl.h | 8 +++---
pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 5 +---
pulsar-client-cpp/lib/PartitionedProducerImpl.h | 21 ++++++----------
.../lib/PatternMultiTopicsConsumerImpl.cc | 12 ++++-----
pulsar-client-cpp/lib/RoundRobinMessageRouter.cc | 8 +++---
pulsar-client-cpp/lib/Schema.cc | 6 ++---
pulsar-client-cpp/lib/Utils.h | 2 +-
pulsar-client-cpp/lib/auth/AuthOauth2.cc | 3 ++-
pulsar-client-cpp/lib/c/c_structs.h | 2 +-
pulsar-client-cpp/lib/checksum/crc32c_sse42.cc | 5 ++--
pulsar-client-cpp/lib/stats/ConsumerStatsImpl.cc | 11 ++------
pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h | 8 +++---
pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc | 27 ++++++++------------
pulsar-client-cpp/lib/stats/ProducerStatsImpl.h | 12 ++++-----
pulsar-client-cpp/pulsar-test-service-start.sh | 2 +-
pulsar-client-cpp/python/CMakeLists.txt | 2 +-
pulsar-client-cpp/python/pulsar_test.py | 17 +++++++++++--
pulsar-client-cpp/python/src/authentication.cc | 2 +-
pulsar-client-cpp/python/src/config.cc | 15 ++++++++---
pulsar-client-cpp/python/src/message.cc | 2 +-
pulsar-client-cpp/python/src/producer.cc | 2 +-
pulsar-client-cpp/tests/AuthPluginTest.cc | 15 ++++++-----
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 10 ++++----
pulsar-client-cpp/tests/BatchMessageTest.cc | 11 ++------
pulsar-client-cpp/tests/ClientTest.cc | 3 ++-
.../tests/ConsumerConfigurationTest.cc | 4 +--
pulsar-client-cpp/tests/KeyBasedBatchingTest.cc | 9 ++++---
54 files changed, 191 insertions(+), 210 deletions(-)
diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 5cbbea6..c7b9afc 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -77,16 +77,28 @@ set(Boost_NO_BOOST_CMAKE ON)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_C_STANDARD 11)
-if (MSVC)
- # Visual Studio compiler flags
+# Compiler specific configuration:
+#
https://stackoverflow.com/questions/10046114/in-cmake-how-can-i-test-if-the-compiler-is-clang
+if (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
add_definitions(-DWIN32_LEAN_AND_MEAN -DNOGDI -D_WIN32_WINNT=0x0501
-D_CRT_SECURE_NO_WARNINGS)
add_compile_options(/wd4244 /wd4267 /wd4018 /wd4715 /wd4251 /wd4275)
-else()
- add_compile_options(-Werror=switch -Wno-deprecated-declarations)
+elseif (CMAKE_CXX_COMPILER_ID STREQUAL "Intel")
+ # ?? Don't have this to test with
+else() # GCC or Clang are mostly compatible:
+ # Turn on warnings and enable warnings-as-errors:
+ add_compile_options(-Wall -Wformat-security -Wvla -Werror)
+ # Turn off certain warnings that are too much pain for too little gain:
+ add_compile_options(-Wno-sign-compare -Wno-deprecated-declarations
-Wno-error=cpp)
if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
add_compile_options(-msse4.2 -mpclmul)
endif()
-endif(MSVC)
+ # Options unique to Clang or GCC:
+ if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
+ add_compile_options(-Qunused-arguments)
+ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND NOT
(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.9))
+ add_compile_options(-Wno-stringop-truncation)
+ endif()
+endif()
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
@@ -229,6 +241,8 @@ if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_CXX_COMPILER_VERSION
VERSION_LESS 4.9)
MESSAGE(STATUS "Using Boost::Regex")
else()
MESSAGE(STATUS "Using std::regex")
+ # Turn on color error messages and show additional help with errors (only
available in GCC v4.9+):
+ add_compile_options(-fdiagnostics-show-option -fdiagnostics-color)
endif()
if(BUILD_PERF_TOOLS)
@@ -285,7 +299,10 @@ if (NOT APPLE AND NOT MSVC)
# we don't set options below to build _pulsar.so
set(CMAKE_CXX_FLAGS_PYTHON "${CMAKE_CXX_FLAGS}")
# Hide all non-exported symbols to avoid conflicts
- set(CMAKE_CXX_FLAGS " -fvisibility=hidden -Wl,--exclude-libs,ALL
${CMAKE_CXX_FLAGS}")
+ add_compile_options(-fvisibility=hidden)
+ if (CMAKE_COMPILER_IS_GNUCC)
+ add_compile_options(-Wl,--exclude-libs,ALL)
+ endif ()
endif ()
if (LIB_ZSTD)
diff --git a/pulsar-client-cpp/examples/SampleConsumerCApi.c
b/pulsar-client-cpp/examples/SampleConsumerCApi.c
index 97b074d..ade7fb9 100644
--- a/pulsar-client-cpp/examples/SampleConsumerCApi.c
+++ b/pulsar-client-cpp/examples/SampleConsumerCApi.c
@@ -43,7 +43,7 @@ int main() {
}
printf("Received message with payload: '%.*s'\n",
pulsar_message_get_length(message),
- pulsar_message_get_data(message));
+ (const char*)pulsar_message_get_data(message));
pulsar_consumer_acknowledge(consumer, message);
pulsar_message_free(message);
diff --git a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
index e75c5d5..1b04980 100644
--- a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
+++ b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
@@ -22,7 +22,7 @@
static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t*
message, void* ctx) {
printf("Received message with payload: '%.*s'\n",
pulsar_message_get_length(message),
- pulsar_message_get_data(message));
+ (const char*)pulsar_message_get_data(message));
pulsar_consumer_acknowledge(consumer, message);
pulsar_message_free(message);
diff --git a/pulsar-client-cpp/examples/SampleReaderCApi.c
b/pulsar-client-cpp/examples/SampleReaderCApi.c
index c0eec06..7510ffa 100644
--- a/pulsar-client-cpp/examples/SampleReaderCApi.c
+++ b/pulsar-client-cpp/examples/SampleReaderCApi.c
@@ -43,7 +43,7 @@ int main() {
}
printf("Received message with payload: '%.*s'\n",
pulsar_message_get_length(message),
- pulsar_message_get_data(message));
+ (const char*)pulsar_message_get_data(message));
pulsar_message_free(message);
}
diff --git a/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h
b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h
index 62fcedb..b4fe9e0 100644
--- a/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h
+++ b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h
@@ -37,9 +37,10 @@ class PULSAR_PUBLIC BrokerConsumerStats {
std::shared_ptr<BrokerConsumerStatsImplBase> impl_;
public:
+ BrokerConsumerStats() = default;
explicit BrokerConsumerStats(std::shared_ptr<BrokerConsumerStatsImplBase>
impl);
- BrokerConsumerStats();
+ virtual ~BrokerConsumerStats() = default;
/** Returns true if the Stats are still valid **/
virtual bool isValid() const;
diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
index 2f2c461..451ab4e 100644
--- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
@@ -166,7 +166,7 @@ class PULSAR_PUBLIC ClientConfiguration {
/**
* @return the path to the trusted TLS certificate file
*/
- std::string getTlsTrustCertsFilePath() const;
+ const std::string& getTlsTrustCertsFilePath() const;
/**
* Configure whether the Pulsar client accepts untrusted TLS certificates
from brokers.
diff --git a/pulsar-client-cpp/lib/Backoff.cc b/pulsar-client-cpp/lib/Backoff.cc
index e202db3..790d3f8 100644
--- a/pulsar-client-cpp/lib/Backoff.cc
+++ b/pulsar-client-cpp/lib/Backoff.cc
@@ -18,16 +18,13 @@
*/
#include "Backoff.h"
#include <boost/random/uniform_int_distribution.hpp>
+#include <algorithm>
+#include <time.h> /* time */
namespace pulsar {
Backoff::Backoff(const TimeDuration& initial, const TimeDuration& max, const
TimeDuration& mandatoryStop)
- : initial_(initial),
- max_(max),
- next_(initial),
- mandatoryStopMade_(false),
- mandatoryStop_(mandatoryStop),
- rng_(time(NULL)) {}
+ : initial_(initial), max_(max), next_(initial),
mandatoryStop_(mandatoryStop), rng_(time(NULL)) {}
TimeDuration Backoff::next() {
TimeDuration current = next_;
diff --git a/pulsar-client-cpp/lib/Backoff.h b/pulsar-client-cpp/lib/Backoff.h
index 458883f..93b97ad 100644
--- a/pulsar-client-cpp/lib/Backoff.h
+++ b/pulsar-client-cpp/lib/Backoff.h
@@ -20,9 +20,6 @@
#define _PULSAR_BACKOFF_HEADER_
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/random/mersenne_twister.hpp>
-#include <stdlib.h> /* srand, rand */
-#include <algorithm>
-#include <time.h> /* time */
#include <pulsar/defines.h>
namespace pulsar {
@@ -42,7 +39,8 @@ class PULSAR_PUBLIC Backoff {
TimeDuration mandatoryStop_;
boost::posix_time::ptime firstBackoffTime_;
boost::random::mt19937 rng_;
- bool mandatoryStopMade_;
+ bool mandatoryStopMade_ = false;
+
friend class PulsarFriend;
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index c7b7b09..716ff91 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -34,15 +34,11 @@ namespace pulsar {
* Constructor
*/
BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool,
const std::string& lookupUrl)
- : cnxPool_(cnxPool), serviceUrl_(lookupUrl), mutex_(),
requestIdGenerator_(0) {}
+ : serviceUrl_(lookupUrl), cnxPool_(cnxPool) {}
BinaryProtoLookupService::BinaryProtoLookupService(ConnectionPool& cnxPool,
const std::string& lookupUrl,
const std::string&
listenerName)
- : cnxPool_(cnxPool),
- serviceUrl_(lookupUrl),
- listenerName_(listenerName),
- mutex_(),
- requestIdGenerator_(0) {}
+ : serviceUrl_(lookupUrl), listenerName_(listenerName), cnxPool_(cnxPool) {}
/*
* @param topicName topic name to get broker for
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h
b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
index c4cf0d0..bbf7632 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
@@ -48,11 +48,11 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public
LookupService {
private:
std::mutex mutex_;
- uint64_t requestIdGenerator_;
+ uint64_t requestIdGenerator_ = 0;
std::string serviceUrl_;
- ConnectionPool& cnxPool_;
std::string listenerName_;
+ ConnectionPool& cnxPool_;
void sendTopicLookupRequest(const std::string& topicName, bool
authoritative,
const std::string& listenerName, Result result,
diff --git a/pulsar-client-cpp/lib/BrokerConsumerStats.cc
b/pulsar-client-cpp/lib/BrokerConsumerStats.cc
index 51b6fa0..e3e1dea 100644
--- a/pulsar-client-cpp/lib/BrokerConsumerStats.cc
+++ b/pulsar-client-cpp/lib/BrokerConsumerStats.cc
@@ -23,8 +23,6 @@
namespace pulsar {
BrokerConsumerStats::BrokerConsumerStats(std::shared_ptr<BrokerConsumerStatsImplBase>
impl) : impl_(impl) {}
-BrokerConsumerStats::BrokerConsumerStats() {}
-
std::shared_ptr<BrokerConsumerStatsImplBase> BrokerConsumerStats::getImpl()
const { return impl_; }
bool BrokerConsumerStats::isValid() const { return impl_->isValid(); }
diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImplBase.h
b/pulsar-client-cpp/lib/BrokerConsumerStatsImplBase.h
index 0bb27dd..282dfc0 100644
--- a/pulsar-client-cpp/lib/BrokerConsumerStatsImplBase.h
+++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImplBase.h
@@ -25,6 +25,7 @@
namespace pulsar {
class BrokerConsumerStatsImplBase {
public:
+ virtual ~BrokerConsumerStatsImplBase() = default;
/** Returns true if the Stats are still valid **/
virtual bool isValid() const = 0;
diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc
b/pulsar-client-cpp/lib/ClientConfiguration.cc
index 188980a..4072f63 100644
--- a/pulsar-client-cpp/lib/ClientConfiguration.cc
+++ b/pulsar-client-cpp/lib/ClientConfiguration.cc
@@ -87,7 +87,9 @@ ClientConfiguration&
ClientConfiguration::setTlsTrustCertsFilePath(const std::st
return *this;
}
-std::string ClientConfiguration::getTlsTrustCertsFilePath() const { return
impl_->tlsTrustCertsFilePath; }
+const std::string& ClientConfiguration::getTlsTrustCertsFilePath() const {
+ return impl_->tlsTrustCertsFilePath;
+}
ClientConfiguration& ClientConfiguration::setTlsAllowInsecureConnection(bool
allowInsecure) {
impl_->tlsAllowInsecureConnection = allowInsecure;
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc
b/pulsar-client-cpp/lib/ClientConnection.cc
index 3bcacc1..0e27e60 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -155,8 +155,7 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
ExecutorServicePtr executor,
const ClientConfiguration&
clientConfiguration,
const AuthenticationPtr& authentication)
- : state_(Pending),
-
operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())),
+ :
operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())),
authentication_(authentication),
serverProtocolVersion_(ProtocolVersion_MIN),
executor_(executor),
@@ -172,21 +171,13 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
logicalAddress_(logicalAddress),
physicalAddress_(physicalAddress),
cnxString_("[<none> -> " + physicalAddress + "] "),
- error_(boost::system::error_code()),
incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
- incomingCmd_(),
connectTimeoutTask_(std::make_shared<PeriodicTask>(executor_->getIOService(),
clientConfiguration.getConnectionTimeout())),
- pendingWriteBuffers_(),
- pendingWriteOperations_(0),
outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
- outgoingCmd_(),
- havePendingPingRequest_(false),
- keepAliveTimer_(),
-
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()),
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
- numOfPendingLookupRequest_(0),
- isTlsAllowInsecureConnection_(false) {
+
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()) {
+
LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" <<
clientConfiguration.getConnectionTimeout());
if (clientConfiguration.isUseTls()) {
#if BOOST_VERSION >= 105400
diff --git a/pulsar-client-cpp/lib/ClientConnection.h
b/pulsar-client-cpp/lib/ClientConnection.h
index cd88f6e..4ba99fa 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -249,7 +249,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
}
}
- State state_;
+ State state_ = Pending;
TimeDuration operationsTimeout_;
AuthenticationPtr authentication_;
int serverProtocolVersion_;
@@ -264,9 +264,13 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
*/
SocketPtr socket_;
TlsSocketPtr tlsSocket_;
+#if BOOST_VERSION >= 106600
+ boost::asio::strand<boost::asio::io_service::executor_type> strand_;
+#else
+ boost::asio::io_service::strand strand_;
+#endif
const std::string logicalAddress_;
-
/*
* stores address of the service, for ex. pulsar://localhost:6650
*/
@@ -312,7 +316,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
// Pending buffers to write on the socket
std::deque<boost::any> pendingWriteBuffers_;
- int pendingWriteOperations_;
+ int pendingWriteOperations_ = 0;
SharedBuffer outgoingBuffer_;
proto::BaseCommand outgoingCmd_;
@@ -321,7 +325,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
HandlerAllocator writeHandlerAllocator_;
// Signals whether we're waiting for a response from broker
- bool havePendingPingRequest_;
+ bool havePendingPingRequest_ = false;
DeadlineTimerPtr keepAliveTimer_;
DeadlineTimerPtr consumerStatsRequestTimer_;
@@ -330,16 +334,10 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
uint32_t maxPendingLookupRequest_;
- uint32_t numOfPendingLookupRequest_;
+ uint32_t numOfPendingLookupRequest_ = 0;
friend class PulsarFriend;
- bool isTlsAllowInsecureConnection_;
-
-#if BOOST_VERSION >= 106600
- boost::asio::strand<boost::asio::io_service::executor_type> strand_;
-#else
- boost::asio::io_service::strand strand_;
-#endif
+ bool isTlsAllowInsecureConnection_ = false;
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/Commands.cc
b/pulsar-client-cpp/lib/Commands.cc
index b90f5a8..0d95bb3 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -163,12 +163,11 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers,
BaseCommand& cmd, uint
4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize; //
cmdLength + cmdSize + magicLength +
// checksumSize + msgMetadataLength + msgMetadataSize
int totalSize = headerContentSize + payloadSize;
- int headersSize = 4 + headerContentSize; // totalSize + headerLength
int checksumReaderIndex = -1;
headers.reset();
- assert(headers.writableBytes() >= headersSize);
- headers.writeUnsignedInt(totalSize); // External frame
+ assert(headers.writableBytes() >= (4 + headerContentSize)); // totalSize
+ headerLength
+ headers.writeUnsignedInt(totalSize); // External
frame
// Write cmd
headers.writeUnsignedInt(cmdSize);
@@ -651,6 +650,7 @@ std::string Commands::messageType(BaseCommand_Type type) {
return "END_TXN_ON_SUBSCRIPTION_RESPONSE";
break;
};
+ BOOST_THROW_EXCEPTION(std::logic_error("Invalid BaseCommand enumeration
value"));
}
void Commands::initBatchMessageMetadata(const Message& msg,
pulsar::proto::MessageMetadata& batchMetadata) {
diff --git a/pulsar-client-cpp/lib/CompressionCodec.cc
b/pulsar-client-cpp/lib/CompressionCodec.cc
index b092cf9..c17b534 100644
--- a/pulsar-client-cpp/lib/CompressionCodec.cc
+++ b/pulsar-client-cpp/lib/CompressionCodec.cc
@@ -46,6 +46,7 @@ CompressionCodec&
CompressionCodecProvider::getCodec(CompressionType compression
default:
return compressionCodecNone_;
}
+ BOOST_THROW_EXCEPTION(std::logic_error("Invalid CompressionType
enumeration value"));
}
CompressionType CompressionCodecProvider::convertType(proto::CompressionType
type) {
@@ -61,6 +62,7 @@ CompressionType
CompressionCodecProvider::convertType(proto::CompressionType typ
case proto::SNAPPY:
return CompressionSNAPPY;
}
+ BOOST_THROW_EXCEPTION(std::logic_error("Invalid proto::CompressionType
enumeration value"));
}
proto::CompressionType CompressionCodecProvider::convertType(CompressionType
type) {
@@ -76,6 +78,7 @@ proto::CompressionType
CompressionCodecProvider::convertType(CompressionType typ
case CompressionSNAPPY:
return proto::SNAPPY;
}
+ BOOST_THROW_EXCEPTION(std::logic_error("Invalid CompressionType
enumeration value"));
}
SharedBuffer CompressionCodecNone::encode(const SharedBuffer& raw) { return
raw; }
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index fea7d4d..4b6479d 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -53,20 +53,14 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client,
const std::string& topic,
startMessageId_(startMessageId),
// This is the initial capacity of the queue
incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)),
- pendingReceives_(),
availablePermits_(0),
receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2),
consumerId_(client->newConsumerId()),
consumerName_(config_.getConsumerName()),
- partitionIndex_(-1),
- consumerCreatedPromise_(),
messageListenerRunning_(true),
batchAcknowledgementTracker_(topic_, subscriptionName,
(long)consumerId_),
- brokerConsumerStats_(),
- consumerStatsBasePtr_(),
negativeAcksTracker_(client, *this, conf),
ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
- msgCrypto_(),
readCompacted_(conf.isReadCompacted()),
lastMessageInBroker_(Optional<MessageId>::of(MessageId())) {
std::stringstream consumerStrStream;
@@ -774,6 +768,7 @@ inline proto::CommandSubscribe_SubType
ConsumerImpl::getSubType() {
case ConsumerKeyShared:
return proto::CommandSubscribe_SubType_Key_Shared;
}
+ BOOST_THROW_EXCEPTION(std::logic_error("Invalid ConsumerType enumeration
value"));
}
inline proto::CommandSubscribe_InitialPosition
ConsumerImpl::getInitialPosition() {
@@ -785,6 +780,7 @@ inline proto::CommandSubscribe_InitialPosition
ConsumerImpl::getInitialPosition(
case InitialPositionEarliest:
return
proto::CommandSubscribe_InitialPosition::CommandSubscribe_InitialPosition_Earliest;
}
+ BOOST_THROW_EXCEPTION(std::logic_error("Invalid InitialPosition
enumeration value"));
}
void ConsumerImpl::statsCallback(Result res, ResultCallback callback,
proto::CommandAck_AckType ackType) {
@@ -1156,7 +1152,7 @@ void
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
return;
}
- getLastMessageIdAsync([this, lastDequed, callback](Result result,
MessageId messageId) {
+ getLastMessageIdAsync([lastDequed, callback](Result result, MessageId
messageId) {
if (result == ResultOk) {
if (messageId > lastDequed && messageId.entryId() != -1) {
callback(ResultOk, true);
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h
b/pulsar-client-cpp/lib/ConsumerImpl.h
index cde8293..b176209 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -197,7 +197,7 @@ class ConsumerImpl : public ConsumerImplBase,
uint64_t consumerId_;
std::string consumerName_;
std::string consumerStr_;
- int32_t partitionIndex_;
+ int32_t partitionIndex_ = -1;
Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
std::atomic_bool messageListenerRunning_;
CompressionCodecProvider compressionCodecProvider_;
diff --git a/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc
b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc
index 1409c50..e61ca70 100644
--- a/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc
+++ b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc
@@ -21,12 +21,8 @@
namespace pulsar {
-EncryptionKeyInfoImpl::EncryptionKeyInfoImpl() : key_(), metadata_() {}
-
-EncryptionKeyInfoImpl::EncryptionKeyInfoImpl(std::string key, StringMap&
metadata) {
- key_ = key;
- metadata_ = metadata;
-}
+EncryptionKeyInfoImpl::EncryptionKeyInfoImpl(std::string key, StringMap&
metadata)
+ : metadata_(metadata), key_(key) {}
std::string& EncryptionKeyInfoImpl::getKey() { return key_; }
diff --git a/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h
b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h
index 0d0298a..0470d1c 100644
--- a/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h
+++ b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h
@@ -19,8 +19,8 @@
#ifndef LIB_ENCRYPTIONKEYINFOIMPL_H_
#define LIB_ENCRYPTIONKEYINFOIMPL_H_
-#include <iostream>
#include <map>
+#include <string>
#include <pulsar/defines.h>
namespace pulsar {
@@ -29,7 +29,7 @@ class PULSAR_PUBLIC EncryptionKeyInfoImpl {
public:
typedef std::map<std::string, std::string> StringMap;
- EncryptionKeyInfoImpl();
+ EncryptionKeyInfoImpl() = default;
EncryptionKeyInfoImpl(std::string key, StringMap& metadata);
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 45e56d0..a54a4c1 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -52,9 +52,9 @@ HTTPLookupService::HTTPLookupService(const std::string
&lookupUrl,
:
executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
authenticationPtr_(authData),
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
+ tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()),
isUseTls_(clientConfiguration.isUseTls()),
tlsAllowInsecure_(clientConfiguration.isTlsAllowInsecureConnection()),
- tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()),
tlsValidateHostname_(clientConfiguration.isValidateHostName()) {
if (lookupUrl[lookupUrl.length() - 1] == '/') {
// Remove trailing '/'
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h
b/pulsar-client-cpp/lib/HTTPLookupService.h
index 4cc6e08..166a14a 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.h
+++ b/pulsar-client-cpp/lib/HTTPLookupService.h
@@ -32,19 +32,22 @@ class HTTPLookupService : public LookupService, public
std::enable_shared_from_t
~CurlInitializer();
};
static CurlInitializer curlInitializer;
+
enum RequestType
{
Lookup,
PartitionMetaData
};
+
typedef Promise<Result, LookupDataResultPtr> LookupPromise;
+
ExecutorServiceProviderPtr executorProvider_;
std::string adminUrl_;
AuthenticationPtr authenticationPtr_;
int lookupTimeoutInSeconds_;
- bool tlsAllowInsecure_;
- bool isUseTls_;
std::string tlsTrustCertsFilePath_;
+ bool isUseTls_;
+ bool tlsAllowInsecure_;
bool tlsValidateHostname_;
static LookupDataResultPtr parsePartitionData(const std::string&);
diff --git a/pulsar-client-cpp/lib/MessageCrypto.cc
b/pulsar-client-cpp/lib/MessageCrypto.cc
index 5cb2ab4..8798dbf 100644
--- a/pulsar-client-cpp/lib/MessageCrypto.cc
+++ b/pulsar-client-cpp/lib/MessageCrypto.cc
@@ -111,8 +111,7 @@ void MessageCrypto::removeExpiredDataKey() {
auto dataKeyCacheIter = dataKeyCache_.begin();
while (dataKeyCacheIter != dataKeyCache_.end()) {
- auto dataKeyEntry = dataKeyCacheIter->second;
- boost::posix_time::time_duration td = now - dataKeyEntry.second;
+ const auto dataKeyEntry = dataKeyCacheIter->second;
if ((now - dataKeyEntry.second) > expireTime) {
dataKeyCache_.erase(dataKeyCacheIter++);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 64aaada..4e31e64 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -30,11 +30,9 @@
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
subscriptionName_(subscriptionName),
topic_(topicName ? topicName->toString() : "EmptyTopics"),
conf_(conf),
- state_(Pending),
messages_(conf.getReceiverQueueSize()),
listenerExecutor_(client->getListenerExecutorProvider()->get()),
messageListener_(conf.getMessageListener()),
- pendingReceives_(),
lookupServicePtr_(lookupServicePtr),
numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
topics_(topics) {
@@ -83,8 +81,7 @@ void MultiTopicsConsumerImpl::start() {
void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer
consumer,
const std::string&
topic,
std::shared_ptr<std::atomic<int>> topicsNeedCreate) {
- int previous = topicsNeedCreate->fetch_sub(1);
- assert(previous > 0);
+ (*topicsNeedCreate)--;
if (result != ResultOk) {
setState(Failed);
@@ -255,8 +252,7 @@ void
MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
std::shared_ptr<std::atomic<int>> consumerUnsubed,
ResultCallback callback)
{
- int previous = consumerUnsubed->fetch_add(1);
- assert(previous < numberTopicPartitions_->load());
+ (*consumerUnsubed)++;
if (result != ResultOk) {
setState(Failed);
@@ -320,8 +316,7 @@ void
MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed, int
numberPartitions,
TopicNamePtr topicNamePtr, std::string& topicPartitionName, ResultCallback
callback) {
- int previous = consumerUnsubed->fetch_add(1);
- assert(previous < numberPartitions);
+ (*consumerUnsubed)++;
if (result != ResultOk) {
setState(Failed);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 3a1249b..aa6b261 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -98,12 +98,12 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
std::map<std::string, int> topicsPartitions_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
- MultiTopicsConsumerState state_;
- std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
- LookupServicePtr lookupServicePtr_;
+ MultiTopicsConsumerState state_ = Pending;
BlockingQueue<Message> messages_;
ExecutorServicePtr listenerExecutor_;
MessageListener messageListener_;
+ LookupServicePtr lookupServicePtr_;
+ std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
Promise<Result, ConsumerImplBaseWeakPtr>
multiTopicsConsumerCreatedPromise_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
const std::vector<std::string>& topics_;
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 7aa506e..e43b509 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -31,14 +31,10 @@
PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std
subscriptionName_(subscriptionName),
topicName_(topicName),
numPartitions_(numPartitions),
- numConsumersCreated_(0),
conf_(conf),
- state_(Pending),
- unsubscribedSoFar_(0),
messages_(1000),
listenerExecutor_(client->getListenerExecutorProvider()->get()),
messageListener_(conf.getMessageListener()),
- pendingReceives_(),
topic_(topicName->toString()) {
std::stringstream consumerStrStream;
consumerStrStream << "[Partitioned Consumer: " << topic_ << "," <<
subscriptionName << ","
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 83ada95..7fa0ccd 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -84,7 +84,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
const std::string subscriptionName_;
const TopicNamePtr topicName_;
unsigned int numPartitions_;
- unsigned int numConsumersCreated_;
+ unsigned int numConsumersCreated_ = 0;
const ConsumerConfiguration conf_;
typedef std::vector<ConsumerImplPtr> ConsumerList;
ConsumerList consumers_;
@@ -92,8 +92,8 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
mutable std::mutex consumersMutex_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
- PartitionedConsumerState state_;
- unsigned int unsubscribedSoFar_;
+ PartitionedConsumerState state_ = Pending;
+ unsigned int unsubscribedSoFar_ = 0;
BlockingQueue<Message> messages_;
ExecutorServicePtr listenerExecutor_;
MessageListener messageListener_;
@@ -104,7 +104,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
DeadlineTimerPtr partitionsUpdateTimer_;
boost::posix_time::time_duration partitionsUpdateInterval_;
LookupServicePtr lookupServicePtr_;
- /* methods */
+
unsigned int getNumPartitions() const;
unsigned int getNumPartitionsWithLock() const;
ConsumerConfiguration getSinglePartitionConsumerConfig() const;
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 94bf353..bdd23ed 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -37,11 +37,8 @@
PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top
topicName_(topicName),
topic_(topicName_->toString()),
conf_(config),
- state_(Pending),
topicMetadata_(new TopicMetadataImpl(numPartitions)),
flushedPartitions_(0) {
- numProducersCreated_ = 0;
- cleanup_ = false;
routerPolicy_ = getMessageRouter();
int maxPendingMessagesPerPartition =
@@ -348,7 +345,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback
callback) {
flushPromise_ = std::make_shared<Promise<Result, bool_type>>();
} else {
// already in flushing, register a listener callback
- std::function<void(Result, bool)> listenerCallback = [this,
callback](Result result, bool_type v) {
+ auto listenerCallback = [callback](Result result, bool_type v) {
if (v) {
callback(ResultOk);
} else {
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index 60881f2..874d6cd 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -83,40 +83,31 @@ class PartitionedProducerImpl : public ProducerImplBase,
const TopicNamePtr topicName_;
const std::string topic_;
- std::unique_ptr<TopicMetadata> topicMetadata_;
-
- unsigned int numProducersCreated_;
+ unsigned int numProducersCreated_ = 0;
/*
* set when one or more Single Partition Creation fails, close will
cleanup and fail the create callbackxo
*/
- bool cleanup_;
+ bool cleanup_ = false;
ProducerConfiguration conf_;
typedef std::vector<ProducerImplPtr> ProducerList;
-
ProducerList producers_;
// producersMutex_ is used to share producers_ and topicMetadata_
mutable std::mutex producersMutex_;
-
- unsigned int getNumPartitions() const;
- unsigned int getNumPartitionsWithLock() const;
-
- ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy);
-
MessageRoutingPolicyPtr routerPolicy_;
// mutex_ is used to share state_, and numProducersCreated_
mutable std::mutex mutex_;
- PartitionedProducerState state_;
+ PartitionedProducerState state_ = Pending;
// only set this promise to value, when producers on all partitions are
created.
Promise<Result, ProducerImplBaseWeakPtr>
partitionedProducerCreatedPromise_;
- MessageRoutingPolicyPtr getMessageRouter();
+ std::unique_ptr<TopicMetadata> topicMetadata_;
std::atomic<int> flushedPartitions_;
std::shared_ptr<Promise<Result, bool_type>> flushPromise_;
@@ -126,6 +117,10 @@ class PartitionedProducerImpl : public ProducerImplBase,
boost::posix_time::time_duration partitionsUpdateInterval_;
LookupServicePtr lookupServicePtr_;
+ unsigned int getNumPartitions() const;
+ unsigned int getNumPartitionsWithLock() const;
+ ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy);
+ MessageRoutingPolicyPtr getMessageRouter();
void runPartitionUpdateTask();
void getPartitionMetadata();
void handleGetPartitions(const Result result, const LookupDataResultPtr&
partitionMetadata);
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
index f5cba85..8e92fd3 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -139,8 +139,7 @@ void
PatternMultiTopicsConsumerImpl::onTopicsAdded(NamespaceTopicsPtr addedTopic
void PatternMultiTopicsConsumerImpl::handleOneTopicAdded(const Result result,
const std::string& topic,
std::shared_ptr<std::atomic<int>> topicsNeedCreate,
ResultCallback
callback) {
- int previous = topicsNeedCreate->fetch_sub(1);
- assert(previous > 0);
+ (*topicsNeedCreate)--;
if (result != ResultOk) {
LOG_ERROR("Failed when subscribed to topic " << topic << " Error - "
<< result);
@@ -162,12 +161,11 @@ void
PatternMultiTopicsConsumerImpl::onTopicsRemoved(NamespaceTopicsPtr removedT
callback(ResultOk);
return;
}
- int topicsNumber = removedTopics->size();
- std::shared_ptr<std::atomic<int>> topicsNeedUnsub =
std::make_shared<std::atomic<int>>(topicsNumber);
- ResultCallback oneTopicUnsubscribedCallback = [this, topicsNeedUnsub,
callback](Result result) {
- int previous = topicsNeedUnsub->fetch_sub(1);
- assert(previous > 0);
+ auto topicsNeedUnsub =
std::make_shared<std::atomic<int>>(removedTopics->size());
+
+ ResultCallback oneTopicUnsubscribedCallback = [topicsNeedUnsub,
callback](Result result) {
+ (*topicsNeedUnsub)--;
if (result != ResultOk) {
LOG_ERROR("Failed when unsubscribe to one topic. Error - " <<
result);
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
index feb3ab0..51d10e2 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
+++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
@@ -30,12 +30,12 @@
RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingS
boost::posix_time::time_duration maxBatchingDelay)
: MessageRouterBase(hashingScheme),
batchingEnabled_(batchingEnabled),
- lastPartitionChange_(TimeUtils::currentTimeMillis()),
- msgCounter_(0),
- cumulativeBatchSize_(0),
maxBatchingMessages_(maxBatchingMessages),
maxBatchingSize_(maxBatchingSize),
- maxBatchingDelay_(maxBatchingDelay) {
+ maxBatchingDelay_(maxBatchingDelay),
+ lastPartitionChange_(TimeUtils::currentTimeMillis()),
+ msgCounter_(0),
+ cumulativeBatchSize_(0) {
boost::random::mt19937 rng(time(nullptr));
boost::random::uniform_int_distribution<int> dist;
currentPartitionCursor_ = dist(rng);
diff --git a/pulsar-client-cpp/lib/Schema.cc b/pulsar-client-cpp/lib/Schema.cc
index f883540..5315a86 100644
--- a/pulsar-client-cpp/lib/Schema.cc
+++ b/pulsar-client-cpp/lib/Schema.cc
@@ -72,14 +72,14 @@ class PULSAR_PUBLIC SchemaInfoImpl {
public:
const std::string name_;
const std::string schema_;
- const SchemaType type_;
+ const SchemaType type_ = BYTES;
const std::map<std::string, std::string> properties_;
- SchemaInfoImpl() : name_("BYTES"), schema_(), type_(BYTES), properties_()
{}
+ SchemaInfoImpl() : name_("BYTES") {}
SchemaInfoImpl(SchemaType schemaType, const std::string &name, const
std::string &schema,
const StringMap &properties)
- : type_(schemaType), name_(name), schema_(schema),
properties_(properties) {}
+ : name_(name), schema_(schema), type_(schemaType),
properties_(properties) {}
};
SchemaInfo::SchemaInfo() : impl_(std::make_shared<SchemaInfoImpl>()) {}
diff --git a/pulsar-client-cpp/lib/Utils.h b/pulsar-client-cpp/lib/Utils.h
index fd50e97..e662ecf 100644
--- a/pulsar-client-cpp/lib/Utils.h
+++ b/pulsar-client-cpp/lib/Utils.h
@@ -60,7 +60,7 @@ struct WaitForCallbackType {
void operator()(T result) { m_promise.setValue(result); }
};
-static std::ostream& operator<<(std::ostream& os, const std::map<Result,
unsigned long>& m) {
+inline std::ostream& operator<<(std::ostream& os, const std::map<Result,
unsigned long>& m) {
os << "{";
for (std::map<Result, unsigned long>::const_iterator it = m.begin(); it !=
m.end(); it++) {
os << "[Key: " << strResult(it->first) << ", Value: " << it->second <<
"], ";
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.cc
b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
index ed39ea5..87a217e 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.cc
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
@@ -103,7 +103,8 @@ Oauth2CachedToken::Oauth2CachedToken(Oauth2TokenResultPtr
token) {
if (expiredIn > 0) {
expiresAt_ = expiredIn + currentTimeMillis();
} else {
- throw std::runtime_error("ExpiresIn in Oauth2TokenResult invalid
value: " + expiredIn);
+ throw std::runtime_error("ExpiresIn in Oauth2TokenResult invalid
value: " +
+ std::to_string(expiredIn));
}
authData_ = AuthenticationDataPtr(new
AuthDataOauth2(token->getAccessToken()));
}
diff --git a/pulsar-client-cpp/lib/c/c_structs.h
b/pulsar-client-cpp/lib/c/c_structs.h
index a1fe285..eb8889a 100644
--- a/pulsar-client-cpp/lib/c/c_structs.h
+++ b/pulsar-client-cpp/lib/c/c_structs.h
@@ -75,7 +75,7 @@ struct _pulsar_topic_metadata {
typedef void (*pulsar_result_callback)(pulsar_result res, void* ctx);
-static void handle_result_callback(pulsar::Result result,
pulsar_result_callback callback, void* ctx) {
+inline void handle_result_callback(pulsar::Result result,
pulsar_result_callback callback, void* ctx) {
if (callback) {
callback((pulsar_result)result, ctx);
}
diff --git a/pulsar-client-cpp/lib/checksum/crc32c_sse42.cc
b/pulsar-client-cpp/lib/checksum/crc32c_sse42.cc
index 14e769f..1ed88e8 100644
--- a/pulsar-client-cpp/lib/checksum/crc32c_sse42.cc
+++ b/pulsar-client-cpp/lib/checksum/crc32c_sse42.cc
@@ -72,15 +72,16 @@ static bool has_pclmulqdq = false;
bool crc32c_initialize() {
if (!initialized) {
+#ifdef _MSC_VER
const uint32_t cpuid_ecx_sse42 = (1 << 20);
const uint32_t cpuid_ecx_pclmulqdq = (1 << 1);
-
-#ifdef _MSC_VER
int CPUInfo[4] = {};
__cpuid(CPUInfo, 1);
has_sse42 = (CPUInfo[2] & cpuid_ecx_sse42) != 0;
has_pclmulqdq = (CPUInfo[2] & cpuid_ecx_pclmulqdq) != 0;
#elif BOOST_ARCH_X86_64
+ const uint32_t cpuid_ecx_sse42 = (1 << 20);
+ const uint32_t cpuid_ecx_pclmulqdq = (1 << 1);
unsigned int eax, ebx, ecx, edx;
if (__get_cpuid(1, &eax, &ebx, &ecx, &edx)) {
has_sse42 = (ecx & cpuid_ecx_sse42) != 0;
diff --git a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.cc
b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.cc
index 3acdc93..38534a6 100644
--- a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.cc
+++ b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.cc
@@ -30,24 +30,17 @@ ConsumerStatsImpl::ConsumerStatsImpl(std::string
consumerStr, ExecutorServicePtr
: consumerStr_(consumerStr),
executor_(executor),
timer_(executor_->createDeadlineTimer()),
- statsIntervalInSeconds_(statsIntervalInSeconds),
- receivedMsgMap_(),
- ackedMsgMap_(),
- totalReceivedMsgMap_(),
- totalAckedMsgMap_(),
- totalNumBytesRecieved_(0),
- numBytesRecieved_(0),
- mutex_() {
+ statsIntervalInSeconds_(statsIntervalInSeconds) {
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
timer_->async_wait(std::bind(&pulsar::ConsumerStatsImpl::flushAndReset,
this, std::placeholders::_1));
}
ConsumerStatsImpl::ConsumerStatsImpl(const ConsumerStatsImpl& stats)
: consumerStr_(stats.consumerStr_),
- totalNumBytesRecieved_(stats.totalNumBytesRecieved_),
numBytesRecieved_(stats.numBytesRecieved_),
receivedMsgMap_(stats.receivedMsgMap_),
ackedMsgMap_(stats.ackedMsgMap_),
+ totalNumBytesRecieved_(stats.totalNumBytesRecieved_),
totalReceivedMsgMap_(stats.totalReceivedMsgMap_),
totalAckedMsgMap_(stats.totalAckedMsgMap_),
statsIntervalInSeconds_(stats.statsIntervalInSeconds_) {}
diff --git a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
index 8afb924..5607cce 100644
--- a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
+++ b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
@@ -28,16 +28,16 @@ namespace pulsar {
class ConsumerStatsImpl : public ConsumerStatsBase {
private:
- unsigned long numBytesRecieved_;
+ std::string consumerStr_;
+
+ unsigned long numBytesRecieved_ = 0;
std::map<Result, unsigned long> receivedMsgMap_;
std::map<std::pair<Result, proto::CommandAck_AckType>, unsigned long>
ackedMsgMap_;
- unsigned long totalNumBytesRecieved_;
+ unsigned long totalNumBytesRecieved_ = 0;
std::map<Result, unsigned long> totalReceivedMsgMap_;
std::map<std::pair<Result, proto::CommandAck_AckType>, unsigned long>
totalAckedMsgMap_;
- std::string consumerStr_;
-
ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
std::mutex mutex_;
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
index 25d150d..af7ae4b 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
@@ -43,34 +43,27 @@ std::string ProducerStatsImpl::latencyToString(const
LatencyAccumulator& obj) {
ProducerStatsImpl::ProducerStatsImpl(std::string producerStr,
ExecutorServicePtr executor,
unsigned int statsIntervalInSeconds)
- : numMsgsSent_(0),
- numBytesSent_(0),
- totalMsgsSent_(0),
- totalBytesSent_(0),
+ : producerStr_(producerStr),
+
latencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities
= probs),
+
totalLatencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities
= probs),
executor_(executor),
timer_(executor->createDeadlineTimer()),
- producerStr_(producerStr),
- statsIntervalInSeconds_(statsIntervalInSeconds),
- mutex_(),
-
latencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities
= probs),
-
totalLatencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities
= probs) {
+ statsIntervalInSeconds_(statsIntervalInSeconds) {
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
timer_->async_wait(std::bind(&pulsar::ProducerStatsImpl::flushAndReset,
this, std::placeholders::_1));
}
ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& stats)
- : numMsgsSent_(stats.numMsgsSent_),
+ : producerStr_(stats.producerStr_),
+ numMsgsSent_(stats.numMsgsSent_),
numBytesSent_(stats.numBytesSent_),
+ sendMap_(stats.sendMap_),
+ latencyAccumulator_(stats.latencyAccumulator_),
totalMsgsSent_(stats.totalMsgsSent_),
totalBytesSent_(stats.totalBytesSent_),
- sendMap_(stats.sendMap_),
totalSendMap_(stats.totalSendMap_),
- timer_(),
- producerStr_(stats.producerStr_),
- statsIntervalInSeconds_(stats.statsIntervalInSeconds_),
- mutex_(),
- latencyAccumulator_(stats.latencyAccumulator_),
- totalLatencyAccumulator_(stats.totalLatencyAccumulator_) {}
+ totalLatencyAccumulator_(stats.totalLatencyAccumulator_),
+ statsIntervalInSeconds_(stats.statsIntervalInSeconds_) {}
void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
if (ec) {
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
index 3fdd9fd..e82628b 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
@@ -50,18 +50,18 @@ typedef boost::accumulators::accumulator_set<
class ProducerStatsImpl : public
std::enable_shared_from_this<ProducerStatsImpl>, public ProducerStatsBase {
private:
- unsigned long numMsgsSent_;
- unsigned long numBytesSent_;
+ std::string producerStr_;
+
+ unsigned long numMsgsSent_ = 0;
+ unsigned long numBytesSent_ = 0;
std::map<Result, unsigned long> sendMap_;
LatencyAccumulator latencyAccumulator_;
- unsigned long totalMsgsSent_;
- unsigned long totalBytesSent_;
+ unsigned long totalMsgsSent_ = 0;
+ unsigned long totalBytesSent_ = 0;
std::map<Result, unsigned long> totalSendMap_;
LatencyAccumulator totalLatencyAccumulator_;
- std::string producerStr_;
-
ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
std::mutex mutex_;
diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh
b/pulsar-client-cpp/pulsar-test-service-start.sh
index d6da230..248d628 100755
--- a/pulsar-client-cpp/pulsar-test-service-start.sh
+++ b/pulsar-client-cpp/pulsar-test-service-start.sh
@@ -31,7 +31,7 @@ if [ -f /.dockerenv ]; then
rm -rf $PULSAR_DIR
mkdir $PULSAR_DIR
TGZ=$(ls -1 $SRC_DIR/distribution/server/target/apache-pulsar*bin.tar.gz |
head -1)
- tar xfz $TGZ -C $PULSAR_DIR --strip-components 1
+ tar -xzf $TGZ -C $PULSAR_DIR --strip-components 1
else
export PULSAR_DIR=$SRC_DIR
fi
diff --git a/pulsar-client-cpp/python/CMakeLists.txt
b/pulsar-client-cpp/python/CMakeLists.txt
index c110d01..f7d4069 100644
--- a/pulsar-client-cpp/python/CMakeLists.txt
+++ b/pulsar-client-cpp/python/CMakeLists.txt
@@ -40,7 +40,7 @@ if (NOT APPLE AND NOT MSVC)
endif()
if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang")
- set(CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS
"${CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS} -undefined dynamic_lookup")
+ set(CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS
"${CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS} -Qunused-arguments -undefined
dynamic_lookup")
endif()
# Newer boost versions don't use the -mt suffix
diff --git a/pulsar-client-cpp/python/pulsar_test.py
b/pulsar-client-cpp/python/pulsar_test.py
index b7d265f..8db53bd 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -372,11 +372,24 @@ class PulsarTest(TestCase):
producer = client.create_producer(topic=topic,
encryption_key="client-rsa.pem",
crypto_key_reader=crypto_key_reader)
- producer.send('hello')
+ reader = client.create_reader(topic=topic,
+ start_message_id=MessageId.earliest,
+ crypto_key_reader=crypto_key_reader)
+ producer.send(b'hello')
msg = consumer.receive(TM)
self.assertTrue(msg)
- self.assertEqual(msg.value(), 'hello')
+ self.assertEqual(msg.value(), b'hello')
consumer.unsubscribe()
+
+ msg = reader.read_next(TM)
+ self.assertTrue(msg)
+ self.assertEqual(msg.data(), b'hello')
+
+ with self.assertRaises(pulsar.Timeout):
+ reader.read_next(100)
+
+ reader.close()
+
client.close()
def test_tls_auth3(self):
diff --git a/pulsar-client-cpp/python/src/authentication.cc
b/pulsar-client-cpp/python/src/authentication.cc
index 00f5848..e236c7e 100644
--- a/pulsar-client-cpp/python/src/authentication.cc
+++ b/pulsar-client-cpp/python/src/authentication.cc
@@ -61,7 +61,7 @@ struct TokenSupplierWrapper {
std::string token;
try {
token = py::call<std::string>(_pySupplier);
- } catch (py::error_already_set e) {
+ } catch(const py::error_already_set& e) {
PyErr_Print();
}
diff --git a/pulsar-client-cpp/python/src/config.cc
b/pulsar-client-cpp/python/src/config.cc
index f9b9c3c..a287248 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -50,7 +50,7 @@ struct ListenerWrapper {
try {
py::call<void>(_pyListener, py::object(&consumer),
py::object(&msg));
- } catch (py::error_already_set e) {
+ } catch (const py::error_already_set& e) {
PyErr_Print();
}
@@ -91,6 +91,13 @@ static ProducerConfiguration&
ProducerConfiguration_setCryptoKeyReader(ProducerC
return conf;
}
+static ReaderConfiguration&
ReaderConfiguration_setCryptoKeyReader(ReaderConfiguration& conf,
+
py::object cryptoKeyReader) {
+ CryptoKeyReaderWrapper cryptoKeyReaderWrapper =
py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
+ conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
+ return conf;
+}
+
class LoggerWrapper: public Logger {
PyObject* const _pyLogger;
const int _pythonLogLevel;
@@ -144,7 +151,8 @@ class LoggerWrapper: public Logger {
py::call_method<void>(_pyLogger, "error",
message.c_str());
break;
}
- } catch (py::error_already_set e) {
+
+ } catch (const py::error_already_set& e) {
_fallbackLogger->log(level, line, message);
}
@@ -218,7 +226,7 @@ void export_config() {
.def("log_conf_file_path",
&ClientConfiguration::setLogConfFilePath, return_self<>())
.def("use_tls", &ClientConfiguration::isUseTls)
.def("use_tls", &ClientConfiguration::setUseTls, return_self<>())
- .def("tls_trust_certs_file_path",
&ClientConfiguration::getTlsTrustCertsFilePath)
+ .def("tls_trust_certs_file_path",
&ClientConfiguration::getTlsTrustCertsFilePath,
return_value_policy<copy_const_reference>())
.def("tls_trust_certs_file_path",
&ClientConfiguration::setTlsTrustCertsFilePath, return_self<>())
.def("tls_allow_insecure_connection",
&ClientConfiguration::isTlsAllowInsecureConnection)
.def("tls_allow_insecure_connection",
&ClientConfiguration::setTlsAllowInsecureConnection, return_self<>())
@@ -304,5 +312,6 @@ void export_config() {
.def("subscription_role_prefix",
&ReaderConfiguration::setSubscriptionRolePrefix)
.def("read_compacted", &ReaderConfiguration::isReadCompacted)
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
+ .def("crypto_key_reader", &ReaderConfiguration_setCryptoKeyReader,
return_self<>())
;
}
diff --git a/pulsar-client-cpp/python/src/message.cc
b/pulsar-client-cpp/python/src/message.cc
index 460a0c7..8532966 100644
--- a/pulsar-client-cpp/python/src/message.cc
+++ b/pulsar-client-cpp/python/src/message.cc
@@ -73,7 +73,7 @@ boost::python::object Message_properties(const Message& msg) {
for (const auto& item : msg.getProperties()) {
pyProperties[item.first] = item.second;
}
- return pyProperties;
+ return boost::python::object(std::move(pyProperties));
}
std::string Topic_name_str(const Message& msg) {
diff --git a/pulsar-client-cpp/python/src/producer.cc
b/pulsar-client-cpp/python/src/producer.cc
index c50eac1..343650f 100644
--- a/pulsar-client-cpp/python/src/producer.cc
+++ b/pulsar-client-cpp/python/src/producer.cc
@@ -42,7 +42,7 @@ void Producer_sendAsyncCallback(PyObject* callback, Result
res, const MessageId&
try {
py::call<void>(callback, res, py::object(&msgId));
- } catch (py::error_already_set e) {
+ } catch (const py::error_already_set& e) {
PyErr_Print();
}
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc
b/pulsar-client-cpp/tests/AuthPluginTest.cc
index b13505e..be987e0 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -354,15 +354,14 @@ TEST(AuthPluginTest, testOauth2) {
}
TEST(AuthPluginTest, testOauth2WrongSecret) {
- try {
- pulsar::AuthenticationDataPtr data;
+ pulsar::AuthenticationDataPtr data;
- std::string params = R"({
- "type": "client_credentials",
- "issuer_url": "https://dev-kt-aa9ne.us.auth0.com",
- "client_id": "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
- "client_secret": "rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ",
- "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/"})";
+ std::string params = R"({
+ "type": "client_credentials",
+ "issuer_url": "https://dev-kt-aa9ne.us.auth0.com",
+ "client_id": "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+ "client_secret": "rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ",
+ "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/"})";
LOG_INFO("PARAMS: " << params);
pulsar::AuthenticationPtr auth = pulsar::AuthOauth2::create(params);
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index f3ff78a..be65775 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -56,7 +56,6 @@ DECLARE_LOG_OBJECT()
using namespace pulsar;
std::mutex mutex_;
-static int globalTestBatchMessagesCounter = 0;
static int globalCount = 0;
static long globalResendMessageCount = 0;
std::string lookupUrl = "pulsar://localhost:6650";
@@ -1297,6 +1296,8 @@ void testHandlerReconnectionPartitionProducers(bool
lazyStartPartitionedProducer
std::string url = adminUrl + "admin/v2/persistent/public/default/" +
topicName + "/partitions";
int res = makePutRequest(url, "1");
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
ProducerConfiguration producerConf;
producerConf.setSendTimeout(10000);
@@ -3275,8 +3276,7 @@ TEST(BasicEndToEndTest,
testRegexTopicsWithInitialPosition) {
for (int i = 0; i < 10; i++) {
Message msg;
- Result res = consumer.receive(msg);
- ASSERT_EQ(ResultOk, result);
+ ASSERT_EQ(ResultOk, consumer.receive(msg));
}
client.close();
@@ -3453,7 +3453,7 @@ TEST(BasicEndToEndTest, testSendCallback) {
std::set<MessageId> sentIdSet;
for (int i = 0; i < 100; i++) {
const auto msg = MessageBuilder().setContent("a").build();
- producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const
MessageId &id) {
+ producer.sendAsync(msg, [&sentIdSet, &latch](Result result, const
MessageId &id) {
ASSERT_EQ(ResultOk, result);
sentIdSet.emplace(id);
latch.countdown();
@@ -3497,7 +3497,7 @@ TEST(BasicEndToEndTest, testSendCallback) {
latch = Latch(numMessages);
for (int i = 0; i < numMessages; i++) {
const auto msg = MessageBuilder().setContent("a").build();
- producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const
MessageId &id) {
+ producer.sendAsync(msg, [&sentIdSet, &latch](Result result, const
MessageId &id) {
ASSERT_EQ(ResultOk, result);
sentIdSet.emplace(id);
latch.countdown();
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc
b/pulsar-client-cpp/tests/BatchMessageTest.cc
index fa60c39..ebe6bf1 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -43,17 +43,11 @@ DECLARE_LOG_OBJECT();
using namespace pulsar;
-static int globalCount = 0;
static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";
// ecpoch time in seconds
-long epochTime = time(NULL);
-
-static void messageListenerFunction(Consumer consumer, const Message& msg) {
- globalCount++;
- consumer.acknowledge(msg);
-}
+const long epochTime = time(NULL);
class MessageCountSendCallback {
public:
@@ -942,7 +936,6 @@ TEST(BatchMessageTest, producerFailureResult) {
Producer producer;
int batchSize = 100;
- int numOfMessages = 10000;
ProducerConfiguration conf;
conf.setCompressionType(CompressionZLib);
@@ -1077,4 +1070,4 @@ TEST(BatchMessageTest, testProducerQueueWithBatches) {
}
ASSERT_EQ(rejectedMessges, 10);
-}
\ No newline at end of file
+}
diff --git a/pulsar-client-cpp/tests/ClientTest.cc
b/pulsar-client-cpp/tests/ClientTest.cc
index 10b5b32..8f5e68b 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -69,6 +69,7 @@ TEST(ClientTest, testSwHwChecksum) {
uint32_t hwIncrementalChecksum = crc32cHw(hwChecksum1, (char
*)data.c_str(), data.length());
// (b.1) sw: checksum on full data
uint32_t swDoubleChecksum = crc32cSw(0, (char *)doubleData.c_str(),
doubleData.length());
+ ASSERT_EQ(swDoubleChecksum, hwDoubleChecksum);
// (b.2) sw: incremental checksum on multiple partial data
swChecksum1 = crc32cHw(0, (char *)data.c_str(), data.length());
uint32_t swIncrementalChecksum = crc32cSw(swChecksum1, (char
*)data.c_str(), data.length());
@@ -174,4 +175,4 @@ TEST(ClientTest, testGetNumberOfReferences) {
ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());
client.close();
-}
\ No newline at end of file
+}
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index 3ab23f7..99c7cfa 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -272,9 +272,7 @@ TEST(ConsumerConfigurationTest,
testSubscriptionInitialPosition) {
}
TEST(ConsumerConfigurationTest, testResetAckTimeOut) {
- Result result;
-
- uint64_t milliSeconds = 50000;
+ const uint64_t milliSeconds = 50000;
ConsumerConfiguration config;
config.setUnAckedMessagesTimeoutMs(milliSeconds);
ASSERT_EQ(milliSeconds, config.getUnAckedMessagesTimeoutMs());
diff --git a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
index b77b2b3..7b39554 100644
--- a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
+++ b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc
@@ -164,9 +164,11 @@ TEST_F(KeyBasedBatchingTest, testSingleBatch) {
std::atomic_int numMessageSent{0};
// messages with no key will use a batch with an empty string as key
for (int i = 0; i < numMessages; i++) {
- producer_.sendAsync(
- MessageBuilder().setContent("x").build(),
- [&numMessageSent](Result result, const MessageId&) {
ASSERT_EQ(result, ResultOk); });
+ producer_.sendAsync(MessageBuilder().setContent("x").build(),
+ [&numMessageSent](Result result, const MessageId&)
{
+ ASSERT_EQ(result, ResultOk);
+ ++numMessageSent;
+ });
}
Message msg;
@@ -174,4 +176,5 @@ TEST_F(KeyBasedBatchingTest, testSingleBatch) {
receiveAndAck(msg);
}
ASSERT_EQ(ResultTimeout, consumer_.receive(msg, 3000));
+ ASSERT_EQ(numMessageSent.load(), numMessages);
}