fgerlits commented on a change in pull request #1083:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1083#discussion_r648970942
##########
File path: .gitignore
##########
@@ -64,6 +64,8 @@ __pycache__/
/content_repository
/provenance_repository
/logs
+/build*
+/cmake-build*
Review comment:
doesn't `/*build*` in line 50 cover these already?
##########
File path: extensions/mqtt/processors/ConsumeMQTT.h
##########
@@ -80,10 +81,13 @@ class ConsumeMQTT : public
processors::AbstractMQTTProcessor {
}
MQTTClient_message *message_;
int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
- int64_t len =
stream->write(reinterpret_cast<uint8_t*>(message_->payload),
message_->payloadlen);
- if (len < 0)
+ if (message_->payloadlen < 0) return -1;
Review comment:
should we set `status_ = -1` in this case, too?
##########
File path: libminifi/include/io/Stream.h
##########
@@ -26,13 +26,13 @@ namespace io {
constexpr size_t STREAM_ERROR = static_cast<size_t>(-1);
-inline bool isError(const size_t read_return) noexcept {
- return read_return == STREAM_ERROR // general error
- || read_return == static_cast<size_t>(-2); // Socket EAGAIN, to be
refactored to eliminate this error condition
+inline bool isError(const size_t read_write_return) noexcept {
+ return read_write_return == STREAM_ERROR // general error
+ || read_write_return == static_cast<size_t>(-2); // read: Socket
EAGAIN, to be refactored to eliminate this error condition
}
-inline bool isError(const int write_return) noexcept {
- return write_return == -1;
+inline bool isError(const int legacy_write_return) noexcept {
+ return legacy_write_return == -1;
Review comment:
is this still needed?
##########
File path: libminifi/src/serialization/FlowFileV3Serializer.cpp
##########
@@ -27,85 +27,78 @@ namespace minifi {
constexpr uint8_t FlowFileV3Serializer::MAGIC_HEADER[];
-int FlowFileV3Serializer::writeLength(std::size_t length, const
std::shared_ptr<io::OutputStream>& out) {
+size_t FlowFileV3Serializer::writeLength(std::size_t length, const
std::shared_ptr<io::OutputStream>& out) {
if (length < MAX_2_BYTE_VALUE) {
return out->write(static_cast<uint16_t>(length));
}
- int sum = 0;
- int ret;
- ret = out->write(static_cast<uint16_t>(MAX_2_BYTE_VALUE));
- if (ret < 0) {
- return ret;
+ size_t sum = 0;
+ {
+ const auto ret = out->write(static_cast<uint16_t>(MAX_2_BYTE_VALUE));
+ if (io::isError(ret)) return ret;
+ sum += ret;
}
- sum += ret;
- ret = out->write(static_cast<uint32_t>(length));
- if (ret < 0) {
- return ret;
+ {
+ const auto ret = out->write(static_cast<uint32_t>(length));
+ if (io::isError(ret)) return ret;
+ sum += ret;
}
- sum += ret;
return sum;
}
-int FlowFileV3Serializer::writeString(const std::string &str, const
std::shared_ptr<io::OutputStream> &out) {
- int sum = 0;
- int ret;
- ret = writeLength(str.length(), out);
- if (ret < 0) {
- return ret;
- }
- sum += ret;
- ret = out->write(const_cast<uint8_t*>(reinterpret_cast<const
uint8_t*>(str.data())), gsl::narrow<int>(str.length()));
- if (ret < 0) {
- return ret;
+size_t FlowFileV3Serializer::writeString(const std::string &str, const
std::shared_ptr<io::OutputStream> &out) {
+ size_t sum = 0;
+ {
+ const auto ret = writeLength(str.length(), out);
+ if (io::isError(ret)) return ret;
+ sum += ret;
}
- if (gsl::narrow<size_t>(ret) != str.length()) {
- return -1;
+ {
+ const auto ret = out->write(reinterpret_cast<const uint8_t*>(str.data()),
str.length());
+ if (io::isError(ret)) return ret;
+ if (ret != str.length()) return io::STREAM_ERROR;
+ sum += ret;
}
- sum += ret;
return sum;
}
int FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>&
flowFile, const std::shared_ptr<io::OutputStream>& out) {
- int sum = 0;
- int ret;
- ret = out->write(const_cast<uint8_t*>(MAGIC_HEADER), sizeof(MAGIC_HEADER));
- if (ret < 0) {
- return ret;
- }
- if (ret != sizeof(MAGIC_HEADER)) {
- return -1;
+ size_t sum = 0;
+ {
+ const auto ret = out->write(MAGIC_HEADER, sizeof(MAGIC_HEADER));
+ if (io::isError(ret)) return -1;
+ if (ret != sizeof(MAGIC_HEADER)) return -1;
+ sum += ret;
}
- sum += ret;
const auto& attributes = flowFile->getAttributes();
- ret = writeLength(attributes.size(), out);
- if (ret < 0) {
- return ret;
+ {
+ const auto ret = writeLength(attributes.size(), out);
+ if (io::isError(ret)) return -1;
+ sum += ret;
}
- sum += ret;
for (const auto& attrIt : attributes) {
- ret = writeString(attrIt.first, out);
- if (ret < 0) {
- return ret;
+ {
+ const auto ret = writeString(attrIt.first, out);
+ if (io::isError(ret)) return -1;
+ sum += ret;
}
- sum += ret;
- ret = writeString(attrIt.second, out);
- if (ret < 0) {
- return ret;
+ {
+ const auto ret = writeString(attrIt.second, out);
+ if (io::isError(ret)) return -1;
+ sum += ret;
}
- sum += ret;
}
- ret = out->write(static_cast<uint64_t>(flowFile->getSize()));
- if (ret < 0) {
- return ret;
+ {
+ const auto ret = out->write(static_cast<uint64_t>(flowFile->getSize()));
+ if (io::isError(ret)) return -1;
+ sum += ret;
}
- sum += ret;
InputStreamPipe pipe(out);
- ret = reader_(flowFile, &pipe);
- if (ret < 0) {
- return ret;
+ {
+ const auto ret = reader_(flowFile, &pipe);
+ if (io::isError(ret)) return -1;
Review comment:
since `ret` is an `int`, I would leave this as `if (ret < 0)`
##########
File path: libminifi/src/sitetosite/SiteToSiteClient.cpp
##########
@@ -435,30 +425,34 @@ int16_t SiteToSiteClient::send(const utils::Identifier
&transactionID, DataPacke
}
if (transaction->current_transfers_ > 0) {
- ret = writeResponse(transaction, CONTINUE_TRANSACTION,
"CONTINUE_TRANSACTION");
- if (ret <= 0) {
+ const auto ret = writeResponse(transaction, CONTINUE_TRANSACTION,
"CONTINUE_TRANSACTION");
+ if (ret == 0 || io::isError(ret)) {
return -1;
}
}
// start to read the packet
- uint32_t numAttributes = gsl::narrow<uint32_t>(packet->_attributes.size());
- ret = transaction->getStream().write(numAttributes);
- if (ret != 4) {
- return -1;
+ {
+ const auto numAttributes =
gsl::narrow<uint32_t>(packet->_attributes.size());
+ const auto ret = transaction->getStream().write(numAttributes);
+ if (ret != 4) {
+ return -1;
+ }
}
- std::map<std::string, std::string>::iterator itAttribute;
- for (itAttribute = packet->_attributes.begin(); itAttribute !=
packet->_attributes.end(); itAttribute++) {
- ret = transaction->getStream().write(itAttribute->first, true);
-
- if (ret <= 0) {
- return -1;
+ for (const auto& _attribute : packet->_attributes) {
Review comment:
minor, but I think the loop variable should be called `attribute`
(without the `_`)
##########
File path: extensions/windows-event-log/CollectorInitiatedSubscription.cpp
##########
@@ -628,17 +628,16 @@ void CollectorInitiatedSubscription::unsubscribe() {
int CollectorInitiatedSubscription::processQueue(const
std::shared_ptr<core::ProcessSession> &session) {
struct WriteCallback: public OutputStreamCallback {
- WriteCallback(const std::string& str)
- : str_(str) {
- status_ = 0;
+ WriteCallback(std::string& str)
Review comment:
why is this no longer `const`? `str_` could be a pointer to `const
std::string`
##########
File path: libminifi/src/io/tls/TLSServerSocket.cpp
##########
@@ -98,8 +100,9 @@ void TLSServerSocket::registerCallback(std::function<bool()>
accept_function, st
int fd_remove = 0;
std::vector<uint8_t> data;
if ( handler(&data, &size) > 0 ) {
- ret = writeData(data.data(), size, fd);
- if (ret < 0) {
+ const auto clamped_size = gsl::narrow<size_t>(utils::clamp(size, 0,
std::numeric_limits<int>::max()));
Review comment:
We could change the handler's signature from `int(std::vector<uint8_t>*,
int*)` to `size_t(std::vector<uint8_t>*, size_t*)`, then we would need neither
the `clamp` nor the `narrow`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]