This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch MINIFICPP-1507 in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit f177e5d46f9ce793b9d3c2727cba7da2ae0686ac Author: Marton Szasz <[email protected]> AuthorDate: Wed Mar 17 10:31:36 2021 +0100 review feedback, fix linter script on mac --- extensions/script/python/PyBaseStream.cpp | 2 +- extensions/sftp/client/SFTPClient.cpp | 2 +- .../standard-processors/processors/ExtractText.cpp | 2 +- libminifi/include/sitetosite/SiteToSiteClient.h | 38 ++-- libminifi/src/FlowFileRecord.cpp | 88 ++++++---- libminifi/src/provenance/Provenance.cpp | 194 +++++++++++++-------- libminifi/src/sitetosite/RawSocketProtocol.cpp | 57 +++--- libminifi/src/sitetosite/SiteToSiteClient.cpp | 34 ++-- thirdparty/google-styleguide/run_linter.sh | 4 +- 9 files changed, 250 insertions(+), 171 deletions(-) diff --git a/extensions/script/python/PyBaseStream.cpp b/extensions/script/python/PyBaseStream.cpp index 1733994..5e583f7 100644 --- a/extensions/script/python/PyBaseStream.cpp +++ b/extensions/script/python/PyBaseStream.cpp @@ -50,7 +50,7 @@ py::bytes PyBaseStream::read(size_t len) { std::vector<uint8_t> buffer(len); const auto read = stream_->read(buffer.data(), len); - return py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read)); + return py::bytes(reinterpret_cast<char *>(buffer.data()), read); } size_t PyBaseStream::write(const py::bytes& buf) { diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp index 8d528ab..1e76cb8 100644 --- a/extensions/sftp/client/SFTPClient.cpp +++ b/extensions/sftp/client/SFTPClient.cpp @@ -577,7 +577,7 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov } logger_->log_trace("Read %d bytes", read_ret); total_read += read_ret; - auto remaining = gsl::narrow<size_t>(read_ret); + auto remaining = read_ret; while (remaining > 0) { const auto write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining); if (write_ret < 0) { diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp index 888ec47..0000b84 100644 --- a/extensions/standard-processors/processors/ExtractText.cpp +++ b/extensions/standard-processors/processors/ExtractText.cpp @@ -161,7 +161,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream> bool repeatingcapture; ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture); - const size_t maxCaptureSize = [this]{ + const size_t maxCaptureSize = [this] { uint64_t val; ctx_->getProperty(MaxCaptureGroupLen.getName(), val); return size_t{val}; diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h index be1d327..36c3d96 100644 --- a/libminifi/include/sitetosite/SiteToSiteClient.h +++ b/libminifi/include/sitetosite/SiteToSiteClient.h @@ -45,15 +45,14 @@ namespace sitetosite { */ class DataPacket { public: - DataPacket(const std::shared_ptr<logging::Logger> &logger, const std::shared_ptr<Transaction> &transaction, std::map<std::string, std::string> attributes, const std::string &payload) - : payload_(payload), - logger_reference_(logger) { - _size = 0; - transaction_ = transaction; - _attributes = attributes; + DataPacket(std::shared_ptr<logging::Logger> logger, std::shared_ptr<Transaction> transaction, std::map<std::string, std::string> attributes, const std::string &payload) + : _attributes{std::move(attributes)}, + transaction_{std::move(transaction)}, + payload_{payload}, + logger_reference_{std::move(logger)} { } std::map<std::string, std::string> _attributes; - uint64_t _size; + uint64_t _size{0}; std::shared_ptr<Transaction> transaction_; const std::string & payload_; std::shared_ptr<logging::Logger> logger_reference_; @@ -62,16 +61,7 @@ class DataPacket { class SiteToSiteClient : public core::Connectable { public: SiteToSiteClient() - : core::Connectable("SitetoSiteClient"), - peer_state_(IDLE), - _batchSendNanos(5000000000), - ssl_context_service_(nullptr), - logger_(logging::LoggerFactory<SiteToSiteClient>::getLogger()) { - _currentVersion = _supportedVersion[0]; - _currentVersionIndex = 0; - _supportedCodecVersion[0] = 1; - _currentCodecVersion = _supportedCodecVersion[0]; - _currentCodecVersionIndex = 0; + : core::Connectable("SitetoSiteClient") { } ~SiteToSiteClient() override = default; @@ -240,7 +230,7 @@ class SiteToSiteClient : public core::Connectable { } // Peer State - PeerState peer_state_; + PeerState peer_state_{PeerState::IDLE}; // portId utils::Identifier port_id_; @@ -257,22 +247,22 @@ class SiteToSiteClient : public core::Connectable { std::map<utils::Identifier, std::shared_ptr<Transaction>> known_transactions_; // BATCH_SEND_NANOS - uint64_t _batchSendNanos; + uint64_t _batchSendNanos{5000000000}; /*** * versioning */ uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1}; - uint32_t _currentVersion; - int _currentVersionIndex; + int _currentVersionIndex{0}; + uint32_t _currentVersion{_supportedVersion[_currentVersionIndex]}; uint32_t _supportedCodecVersion[1] = {1}; - uint32_t _currentCodecVersion; - int _currentCodecVersionIndex; + int _currentCodecVersionIndex{0}; + uint32_t _currentCodecVersion{_supportedCodecVersion[_currentCodecVersionIndex]}; std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; private: - std::shared_ptr<logging::Logger> logger_; + std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<SiteToSiteClient>::getLogger()}; }; // Nest Callback Class for write stream diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index 047c571..f7b1638 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -163,66 +163,88 @@ bool FlowFileRecord::Persist(const std::shared_ptr<core::Repository>& flowReposi std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inStream, const std::shared_ptr<core::ContentRepository>& content_repo, utils::Identifier& container) { auto file = std::make_shared<FlowFileRecord>(); - auto ret = inStream.read(file->event_time_); - if (ret != 8) { - return {}; + { + const auto ret = inStream.read(file->event_time_); + if (ret != 8) { + return {}; + } } - ret = inStream.read(file->entry_date_); - if (ret != 8) { - return {}; + { + const auto ret = inStream.read(file->entry_date_); + if (ret != 8) { + return {}; + } } - ret = inStream.read(file->lineage_start_date_); - if (ret != 8) { - return {}; + { + const auto ret = inStream.read(file->lineage_start_date_); + if (ret != 8) { + return {}; + } } - ret = inStream.read(file->uuid_); - if (ret == static_cast<size_t>(-1)) { - return {}; + { + const auto ret = inStream.read(file->uuid_); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return {}; + } } - ret = inStream.read(container); - if (ret == static_cast<size_t>(-1)) { - return {}; + { + const auto ret = inStream.read(container); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return {}; + } } // read flow attributes uint32_t numAttributes = 0; - ret = inStream.read(numAttributes); - if (ret != 4) { - return {}; + { + const auto ret = inStream.read(numAttributes); + if (ret != 4) { + return {}; + } } for (uint32_t i = 0; i < numAttributes; i++) { std::string key; - ret = inStream.read(key, true); - if (ret == static_cast<size_t>(-1)) { - return {}; + { + const auto ret = inStream.read(key, true); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return {}; + } } std::string value; - ret = inStream.read(value, true); - if (ret == static_cast<size_t>(-1)) { - return {}; + { + const auto ret = inStream.read(value, true); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return {}; + } } file->attributes_[key] = value; } std::string content_full_path; - ret = inStream.read(content_full_path); - if (ret == static_cast<size_t>(-1)) { - return {}; + { + const auto ret = inStream.read(content_full_path); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return {}; + } } - ret = inStream.read(file->size_); - if (ret != 8) { - return {}; + { + const auto ret = inStream.read(file->size_); + if (ret != 8) { + return {}; + } } - ret = inStream.read(file->offset_); - if (ret != 8) { - return {}; + { + const auto ret = inStream.read(file->offset_); + if (ret != 8) { + return {}; + } } file->claim_ = std::make_shared<ResourceClaim>(content_full_path, content_repo); diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index 2fac08b..0dd9cfc 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -237,141 +237,189 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableCo bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t bufferSize) { org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<unsigned int>(bufferSize)); - auto ret = outStream.read(uuid_); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(uuid_); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } uint32_t eventType; - ret = outStream.read(eventType); - if (ret != 4) { - return false; + { + const auto ret = outStream.read(eventType); + if (ret != 4) { + return false; + } } - this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType; - ret = outStream.read(this->_eventTime); - if (ret != 8) { - return false; + this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType; + { + const auto ret = outStream.read(this->_eventTime); + if (ret != 8) { + return false; + } } - ret = outStream.read(this->_entryDate); - if (ret != 8) { - return false; + { + const auto ret = outStream.read(this->_entryDate); + if (ret != 8) { + return false; + } } - ret = outStream.read(this->_eventDuration); - if (ret != 8) { - return false; + { + const auto ret = outStream.read(this->_eventDuration); + if (ret != 8) { + return false; + } } - ret = outStream.read(this->_lineageStartDate); - if (ret != 8) { - return false; + { + const auto ret = outStream.read(this->_lineageStartDate); + if (ret != 8) { + return false; + } } - ret = outStream.read(this->_componentId); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(this->_componentId); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } - ret = outStream.read(this->_componentType); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(this->_componentType); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } - ret = outStream.read(this->flow_uuid_); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(this->flow_uuid_); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } - ret = outStream.read(this->_details); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(this->_details); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } // read flow attributes uint32_t numAttributes = 0; - ret = outStream.read(numAttributes); - if (ret != 4) { - return false; + { + const auto ret = outStream.read(numAttributes); + if (ret != 4) { + return false; + } } for (uint32_t i = 0; i < numAttributes; i++) { std::string key; - ret = outStream.read(key); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(key); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } std::string value; - ret = outStream.read(value); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(value); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } this->_attributes[key] = value; } - ret = outStream.read(this->_contentFullPath); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(this->_contentFullPath); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } - ret = outStream.read(this->_size); - if (ret != 8) { - return false; + { + const auto ret = outStream.read(this->_size); + if (ret != 8) { + return false; + } } - ret = outStream.read(this->_offset); - if (ret != 8) { - return false; + { + const auto ret = outStream.read(this->_offset); + if (ret != 8) { + return false; + } } - ret = outStream.read(this->_sourceQueueIdentifier); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(this->_sourceQueueIdentifier); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) { // read UUIDs uint32_t number = 0; - ret = outStream.read(number); - if (ret != 4) { - return false; + { + const auto ret = outStream.read(number); + if (ret != 4) { + return false; + } } for (uint32_t i = 0; i < number; i++) { utils::Identifier parentUUID; - ret = outStream.read(parentUUID); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(parentUUID); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } this->addParentUuid(parentUUID); } number = 0; - ret = outStream.read(number); - if (ret != 4) { - return false; + { + const auto ret = outStream.read(number); + if (ret != 4) { + return false; + } } for (uint32_t i = 0; i < number; i++) { utils::Identifier childUUID; - ret = outStream.read(childUUID); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(childUUID); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } this->addChildUuid(childUUID); } } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) { - ret = outStream.read(this->_transitUri); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(this->_transitUri); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { - ret = outStream.read(this->_transitUri); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(this->_transitUri); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } - ret = outStream.read(this->_sourceSystemFlowFileIdentifier); - if (ret == static_cast<size_t>(-1)) { - return false; + { + const auto ret = outStream.read(this->_sourceSystemFlowFileIdentifier); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } } diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp index c75eb17..9dce59d 100644 --- a/libminifi/src/sitetosite/RawSocketProtocol.cpp +++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp @@ -357,37 +357,46 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) { } uint32_t number; - auto ret = peer_->read(number); - - if (ret == static_cast<size_t>(-1)) { - tearDown(); - return false; + { + const auto ret = peer_->read(number); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + tearDown(); + return false; + } } for (uint32_t i = 0; i < number; i++) { std::string host; - ret = peer_->read(host); - if (ret == static_cast<size_t>(-1)) { - tearDown(); - return false; + { + const auto ret = peer_->read(host); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + tearDown(); + return false; + } } uint32_t port; - ret = peer_->read(port); - if (ret == static_cast<size_t>(-1)) { - tearDown(); - return false; + { + const auto ret = peer_->read(port); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + tearDown(); + return false; + } } uint8_t secure; - ret = peer_->read(secure); - if (ret == static_cast<size_t>(-1)) { - tearDown(); - return false; + { + const auto ret = peer_->read(secure); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + tearDown(); + return false; + } } uint32_t count; - ret = peer_->read(count); - if (ret == static_cast<size_t>(-1)) { - tearDown(); - return false; + { + const auto ret = peer_->read(count); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + tearDown(); + return false; + } } PeerStatus peer_status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true); peers.push_back(std::move(peer_status)); @@ -413,13 +422,13 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) { std::string requestTypeStr; const auto ret = peer_->read(requestTypeStr); - if (ret == static_cast<size_t>(-1)) - return gsl::narrow_cast<int>(ret); + if (ret == 0 || ret == static_cast<size_t>(-1)) + return static_cast<int>(ret); for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) { if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) { type = (RequestType) i; - return gsl::narrow_cast<int>(ret); + return static_cast<int>(ret); } } diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp index b009c18..7e4a38d 100644 --- a/libminifi/src/sitetosite/SiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp @@ -42,7 +42,7 @@ int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transac uint8_t thirdByte; ret = peer_->read(thirdByte); - if (ret == static_cast<size_t>(-1)) + if (ret == 0 || ret == static_cast<size_t>(-1)) return gsl::narrow_cast<int>(ret); code = (RespondCode) thirdByte; @@ -582,9 +582,11 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke // start to read the packet uint32_t numAttributes; - auto ret = transaction->getStream().read(numAttributes); - if (ret != static_cast<size_t>(-1) || numAttributes > MAX_NUM_ATTRIBUTES) { - return false; + { + const auto ret = transaction->getStream().read(numAttributes); + if (ret == 0 || ret == static_cast<size_t>(-1) || numAttributes > MAX_NUM_ATTRIBUTES) { + return false; + } } // read the attributes @@ -592,22 +594,28 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke for (unsigned int i = 0; i < numAttributes; i++) { std::string key; std::string value; - ret = transaction->getStream().read(key, true); - if (ret != static_cast<size_t>(-1)) { - return false; + { + const auto ret = transaction->getStream().read(key, true); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } - ret = transaction->getStream().read(value, true); - if (ret != static_cast<size_t>(-1)) { - return false; + { + const auto ret = transaction->getStream().read(value, true); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } packet->_attributes[key] = value; logger_->log_debug("Site2Site transaction %s receives attribute key %s value %s", transactionID.to_string(), key, value); } uint64_t len; - ret = transaction->getStream().read(len); - if (ret != static_cast<size_t>(-1)) { - return false; + { + const auto ret = transaction->getStream().read(len); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } packet->_size = len; diff --git a/thirdparty/google-styleguide/run_linter.sh b/thirdparty/google-styleguide/run_linter.sh index ca0a1ad..5015c94 100755 --- a/thirdparty/google-styleguide/run_linter.sh +++ b/thirdparty/google-styleguide/run_linter.sh @@ -40,5 +40,7 @@ done HEADERS=`find $INCLUDE_DIRS -name '*.h' | sort | uniq | tr '\n' ' '` SOURCES=`find $SOURCE_DIRS -name '*.cpp' | sort | uniq | tr '\n' ' '` -REPOSITORY="$(realpath --physical "$(dirname "$0")/../..")" +# this realpath alternative should work on mac +alias prealpath="python -c 'import os, sys; print(os.path.realpath(sys.argv[1]))'" +REPOSITORY="$(python -c 'import os, sys; print(os.path.realpath(sys.argv[1] + "../.."))' $(dirname "$0"))" python ${SCRIPT_DIR}/cpplint.py --linelength=200 --repository="$REPOSITORY" ${HEADERS} ${SOURCES}
