This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch MINIFICPP-1507 in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 5c41fff9c012860cebb8c830c38057b93efef7dc Author: Marton Szasz <[email protected]> AuthorDate: Fri Mar 12 14:10:55 2021 +0100 convert InputStream::read to size_t --- extensions/aws/processors/PutS3Object.h | 11 ++-- extensions/civetweb/processors/ListenHTTP.h | 17 +++--- extensions/http-curl/client/HTTPCallback.h | 2 +- extensions/http-curl/client/HTTPStream.cpp | 9 ++-- extensions/http-curl/client/HTTPStream.h | 4 +- extensions/http-curl/tests/CivetStream.h | 8 ++- extensions/http-curl/tests/HTTPHandlers.h | 9 ++-- extensions/jni/jvm/JniReferenceObjects.h | 3 +- extensions/libarchive/CompressContent.h | 49 +++++++++-------- extensions/libarchive/FocusArchiveEntry.cpp | 9 ++-- extensions/librdkafka/PublishKafka.cpp | 7 ++- extensions/mqtt/processors/ConvertJSONAck.h | 13 ++--- extensions/mqtt/processors/ConvertUpdate.cpp | 2 +- extensions/mqtt/processors/PublishMQTT.h | 18 ++++--- extensions/opc/src/putopc.cpp | 14 ++--- extensions/rocksdb-repos/RocksDbStream.cpp | 35 +++++------- extensions/rocksdb-repos/RocksDbStream.h | 4 +- extensions/script/python/PyBaseStream.cpp | 8 ++- extensions/script/python/PyBaseStream.h | 2 +- extensions/sftp/client/SFTPClient.cpp | 14 ++--- .../standard-processors/processors/ExtractText.cpp | 22 ++++---- .../standard-processors/processors/LogAttribute.h | 20 ++++--- .../standard-processors/processors/PutFile.cpp | 25 ++++----- .../standard-processors/processors/PutFile.h | 2 +- libminifi/include/io/AtomicEntryStream.h | 13 +++-- libminifi/include/io/BufferStream.h | 6 +-- libminifi/include/io/CRCStream.h | 6 +-- libminifi/include/io/ClientSocket.h | 4 +- libminifi/include/io/DescriptorStream.h | 4 +- libminifi/include/io/FileStream.h | 4 +- libminifi/include/io/InputStream.h | 16 +++--- libminifi/include/io/Stream.h | 2 +- libminifi/include/io/StreamPipe.h | 12 ++--- libminifi/include/io/tls/SecureDescriptorStream.h | 4 +- libminifi/include/io/tls/TLSSocket.h | 4 +- libminifi/include/provenance/Provenance.h | 16 +++--- libminifi/include/sitetosite/Peer.h | 2 +- libminifi/include/sitetosite/SiteToSiteClient.h | 55 ++++++++----------- libminifi/include/utils/ByteArrayCallback.h | 7 +-- libminifi/include/utils/Enum.h | 4 +- libminifi/include/utils/FileOutputCallback.h | 2 +- libminifi/src/FlowFileRecord.cpp | 14 +++-- libminifi/src/c2/ControllerSocketProtocol.cpp | 63 +++++++++++----------- libminifi/src/core/ProcessSession.cpp | 10 ++-- libminifi/src/core/ProcessSessionReadCallback.cpp | 13 ++--- libminifi/src/io/BufferStream.cpp | 15 +++--- libminifi/src/io/ClientSocket.cpp | 21 ++++---- libminifi/src/io/DescriptorStream.cpp | 15 +++--- libminifi/src/io/FileStream.cpp | 22 ++++---- libminifi/src/io/InputStream.cpp | 27 +++++----- libminifi/src/io/tls/SecureDescriptorStream.cpp | 41 +++++++------- libminifi/src/io/tls/TLSSocket.cpp | 36 ++++++------- libminifi/src/provenance/Provenance.cpp | 32 ++++++----- libminifi/src/sitetosite/RawSocketProtocol.cpp | 35 ++++++------ libminifi/src/sitetosite/SiteToSiteClient.cpp | 35 ++++++------ libminifi/src/utils/ByteArrayCallback.cpp | 10 ++-- libminifi/src/utils/FileOutputCallback.cpp | 2 +- libminifi/test/BufferReader.h | 10 ++-- .../test/archive-tests/CompressContentTests.cpp | 40 ++++++-------- libminifi/test/archive-tests/MergeFileTests.cpp | 10 ++-- .../test/rocksdb-tests/ContentSessionTests.cpp | 9 ++-- .../rocksdb-tests/DBContentRepositoryTests.cpp | 4 +- .../test/rocksdb-tests/RocksDBStreamTests.cpp | 2 +- libminifi/test/unit/FileStreamTests.cpp | 28 +++++----- libminifi/test/unit/SiteToSiteHelper.h | 2 +- nanofi/src/api/nanofi.cpp | 2 +- nanofi/tests/CSite2SiteTests.cpp | 11 ++-- 67 files changed, 458 insertions(+), 519 deletions(-) diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h index ad3072a..e55ac1e 100644 --- a/extensions/aws/processors/PutS3Object.h +++ b/extensions/aws/processors/PutS3Object.h @@ -31,6 +31,7 @@ #include "S3Processor.h" #include "utils/GeneralUtils.h" +#include "utils/gsl.h" template<typename T> class S3TestsFixture; @@ -94,20 +95,20 @@ class PutS3Object : public S3Processor { auto data_stream = std::make_shared<std::stringstream>(); read_size_ = 0; while (read_size_ < flow_size_) { - auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE); - int read_ret = stream->read(buffer, next_read_size); - if (read_ret < 0) { + const auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE); + const auto read_ret = stream->read(buffer.data(), next_read_size); + if (read_ret == static_cast<size_t>(-1)) { return -1; } if (read_ret > 0) { - data_stream->write(reinterpret_cast<char*>(buffer.data()), next_read_size); + data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size)); read_size_ += read_ret; } else { break; } } result_ = s3_wrapper_.putObject(options_, data_stream); - return read_size_; + return gsl::narrow<int64_t>(read_size_); } uint64_t flow_size_; diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h index adf4a76..960185c 100644 --- a/extensions/civetweb/processors/ListenHTTP.h +++ b/extensions/civetweb/processors/ListenHTTP.h @@ -32,6 +32,7 @@ #include "core/Resource.h" #include "core/logging/LoggerConfiguration.h" #include "utils/MinifiConcurrentQueue.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -48,7 +49,7 @@ class ListenHTTP : public core::Processor { /*! * Create a new processor */ - ListenHTTP(const std::string& name, const utils::Identifier& uuid = {}) + explicit ListenHTTP(const std::string& name, const utils::Identifier& uuid = {}) : Processor(name, uuid), logger_(logging::LoggerFactory<ListenHTTP>::getLogger()), batch_size_(0) { @@ -56,7 +57,7 @@ class ListenHTTP : public core::Processor { callbacks_.log_access = &logAccess; } // Destructor - virtual ~ListenHTTP(); + ~ListenHTTP() override; // Processor Name static constexpr char const *ProcessorName = "ListenHTTP"; // Supported Properties @@ -131,14 +132,12 @@ class ListenHTTP : public core::Processor { } int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { out_str_->resize(stream->size()); - uint64_t num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]), - gsl::narrow<int>(stream->size())); - + const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]), stream->size()); if (num_read != stream->size()) { throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream"); } - return num_read; + return gsl::narrow<int64_t>(num_read); } private: @@ -148,7 +147,7 @@ class ListenHTTP : public core::Processor { // Write callback for transferring data from HTTP request to content repo class WriteCallback : public OutputStreamCallback { public: - WriteCallback(std::unique_ptr<io::BufferStream>); + explicit WriteCallback(std::unique_ptr<io::BufferStream>); int64_t process(const std::shared_ptr<io::BaseStream>& stream) override; private: @@ -159,11 +158,11 @@ class ListenHTTP : public core::Processor { try { struct mg_context* ctx = mg_get_context(conn); /* CivetServer stores 'this' as the userdata when calling mg_start */ - CivetServer* server = static_cast<CivetServer*>(mg_get_user_data(ctx)); + auto* const server = static_cast<CivetServer*>(mg_get_user_data(ctx)); if (server == nullptr) { return 0; } - std::shared_ptr<logging::Logger>* logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext())); + auto* const logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext())); if (logger == nullptr) { return 0; } diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h index 921a8c5..242990e 100644 --- a/extensions/http-curl/client/HTTPCallback.h +++ b/extensions/http-curl/client/HTTPCallback.h @@ -83,7 +83,7 @@ class HttpStreamingCallback : public ByteInputCallBack { if (stream->size() > 0) { vec.resize(stream->size()); - stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size())); + stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size()); } return processInner(std::move(vec)); diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp index 5ba7f3f..f9b8f49 100644 --- a/extensions/http-curl/client/HTTPStream.cpp +++ b/extensions/http-curl/client/HTTPStream.cpp @@ -51,7 +51,7 @@ void HttpStream::close() { http_read_callback_.close(); } -void HttpStream::seek(uint64_t /*offset*/) { +void HttpStream::seek(size_t /*offset*/) { // seek is an unnecessary part of this implementatino throw std::logic_error{"HttpStream::seek is unimplemented"}; } @@ -81,8 +81,7 @@ int HttpStream::write(const uint8_t *value, int size) { } } -int HttpStream::read(uint8_t *buf, int buflen) { - gsl_Expects(buflen >= 0); +size_t HttpStream::read(uint8_t *buf, size_t buflen) { if (buflen == 0) { return 0; } @@ -97,10 +96,10 @@ int HttpStream::read(uint8_t *buf, int buflen) { started_ = true; } } - return gsl::narrow<int>(http_read_callback_.readFully((char*) buf, buflen)); + return http_read_callback_.readFully(reinterpret_cast<char*>(buf), buflen); } else { - return -1; + return static_cast<size_t>(-1); } } diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h index 711d1d2..8d49d02 100644 --- a/extensions/http-curl/client/HTTPStream.h +++ b/extensions/http-curl/client/HTTPStream.h @@ -77,7 +77,7 @@ class HttpStream : public io::BaseStream { * Skip to the specified offset. * @param offset offset to which we will skip */ - void seek(uint64_t offset) override; + void seek(size_t offset) override; size_t size() const override { return written; @@ -91,7 +91,7 @@ class HttpStream : public io::BaseStream { * @param buf buffer in which we extract data * @param buflen */ - int read(uint8_t *buf, int buflen) override; + size_t read(uint8_t *buf, size_t buflen) override; /** * writes value to stream diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h index 9cbba27..e333b54 100644 --- a/extensions/http-curl/tests/CivetStream.h +++ b/extensions/http-curl/tests/CivetStream.h @@ -28,6 +28,8 @@ #include "io/BaseStream.h" #include "civetweb.h" #include "CivetServer.h" +#include "utils/gsl.h" + namespace org { namespace apache { namespace nifi { @@ -49,8 +51,10 @@ class CivetStream : public io::InputStream { * @param buf buffer in which we extract data * @param buflen */ - int read(uint8_t *buf, int buflen) override { - return mg_read(conn, buf, buflen); + size_t read(uint8_t *buf, size_t buflen) override { + const auto ret = mg_read(conn, buf, buflen); + if (ret < 0) return static_cast<size_t>(-1); + return gsl::narrow_cast<size_t>(ret); } protected: diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h index cedb834..5c49205 100644 --- a/extensions/http-curl/tests/HTTPHandlers.h +++ b/extensions/http-curl/tests/HTTPHandlers.h @@ -214,7 +214,7 @@ class FlowFileResponder : public ServerAwareHandler { minifi::io::CivetStream civet_stream(conn); minifi::io::CRCStream < minifi::io::CivetStream > stream(gsl::make_not_null(&civet_stream)); uint32_t num_attributes; - int read; + size_t read; uint64_t total_size = 0; read = stream.read(num_attributes); if(!isServerRunning())return false; @@ -241,9 +241,10 @@ class FlowFileResponder : public ServerAwareHandler { flow->data.resize(gsl::narrow<size_t>(length)); flow->total_size = total_size; - read = stream.read(flow->data.data(), gsl::narrow<int>(length)); - if(!isServerRunning())return false; - assert(read == gsl::narrow<int>(length)); + read = stream.read(flow->data.data(), length); + if (!isServerRunning()) return false; + (void)read; + assert(read == length); if (!invalid_checksum) { site2site_rest_resp = std::to_string(stream.getCRC()); diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h index 67798e9..f81391b 100644 --- a/extensions/jni/jvm/JniReferenceObjects.h +++ b/extensions/jni/jvm/JniReferenceObjects.h @@ -142,7 +142,8 @@ class JniByteInputStream : public minifi::InputStreamCallback { int writtenOffset = 0; int read = 0; do { - int actual = stream_->read(buffer_, std::min(remaining, buffer_size_)); + // JNI takes size as int, there's not much we can do here to support 2GB+ sizes + int actual = static_cast<int>(stream_->read(buffer_, std::min(remaining, buffer_size_))); if (actual <= 0) { if (read == 0) { stream_ = nullptr; diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h index fc7851e..df7fc75 100644 --- a/extensions/libarchive/CompressContent.h +++ b/extensions/libarchive/CompressContent.h @@ -21,6 +21,7 @@ #define __COMPRESS_CONTENT_H__ #include <cinttypes> +#include <utility> #include "archive_entry.h" #include "archive.h" @@ -56,7 +57,7 @@ public: , encapsulateInTar_(false) { } // Destructor - virtual ~CompressContent() = default; + ~CompressContent() override = default; // Processor Name static constexpr char const* ProcessorName = "CompressContent"; // Supported Properties @@ -94,8 +95,8 @@ public: ReadCallbackCompress(std::shared_ptr<core::FlowFile> &flow, struct archive *arch, struct archive_entry *entry) : flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) { } - ~ReadCallbackCompress() = default; - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { + ~ReadCallbackCompress() override = default; + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { uint8_t buffer[4096U]; int64_t ret = 0; uint64_t read_size = 0; @@ -107,13 +108,13 @@ public: return -1; } while (read_size < flow_->getSize()) { - ret = stream->read(buffer, sizeof(buffer)); - if (ret < 0) { + const auto readret = stream->read(buffer, sizeof(buffer)); + if (readret == static_cast<size_t>(-1)) { status_ = -1; return -1; } - if (ret > 0) { - ret = archive_write_data(arch_, buffer, gsl::narrow<size_t>(ret)); + if (readret > 0) { + ret = archive_write_data(arch_, buffer, readret); if (ret < 0) { logger_->log_error("Compress Content archive error %s", archive_error_string(arch_)); status_ = -1; @@ -124,7 +125,7 @@ public: break; } } - return read_size; + return gsl::narrow<int64_t>(read_size); } std::shared_ptr<core::FlowFile> flow_; struct archive *arch_; @@ -135,25 +136,23 @@ public: // Nest Callback Class for read stream from flow for decompress class ReadCallbackDecompress: public InputStreamCallback { public: - ReadCallbackDecompress(const std::shared_ptr<core::FlowFile> &flow) : - read_size_(0), offset_(0), flow_(flow) { - origin_offset_ = flow_->getOffset(); + explicit ReadCallbackDecompress(std::shared_ptr<core::FlowFile> flow) : + flow_(std::move(flow)) { } - ~ReadCallbackDecompress() = default; - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { + ~ReadCallbackDecompress() override = default; + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { read_size_ = 0; stream->seek(offset_); - int readRet = stream->read(buffer_, sizeof(buffer_)); + const auto readRet = stream->read(buffer_, sizeof(buffer_)); read_size_ = readRet; - if (readRet > 0) { - offset_ += read_size_; + if (readRet != static_cast<size_t>(-1)) { + offset_ += readRet; } - return readRet; + return gsl::narrow<int64_t>(readRet); } - int64_t read_size_; - uint8_t buffer_[8192]; - uint64_t offset_; - uint64_t origin_offset_; + size_t read_size_ = 0; + uint8_t buffer_[8192] = {0}; + size_t offset_ = 0; std::shared_ptr<core::FlowFile> flow_; }; // Nest Callback Class for write stream @@ -383,8 +382,8 @@ public: std::vector<uint8_t> buffer(16 * 1024U); int64_t read_size = 0; while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) { - int ret = inputStream->read(buffer.data(), gsl::narrow<int>(buffer.size())); - if (ret < 0) { + const auto ret = inputStream->read(buffer.data(), buffer.size()); + if (ret == static_cast<size_t>(-1)) { return -1; } else if (ret == 0) { break; @@ -414,7 +413,7 @@ public: success_ = filterStream->isFinished(); - return flow_->getSize(); + return gsl::narrow<int64_t>(flow_->getSize()); } }; @@ -440,7 +439,7 @@ private: void processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<core::ProcessSession>& session); std::shared_ptr<logging::Logger> logger_; - int compressLevel_; + int compressLevel_{}; CompressionMode compressMode_; ExtendedCompressionFormat compressFormat_; bool updateFileName_; diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp index 59a43e0..9452dce 100644 --- a/extensions/libarchive/FocusArchiveEntry.cpp +++ b/extensions/libarchive/FocusArchiveEntry.cpp @@ -34,6 +34,7 @@ #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "Exception.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -150,20 +151,20 @@ typedef struct { la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d, const void **buf) { auto data = static_cast<FocusArchiveEntryReadData *>(d); *buf = data->buf; - int read = 0; - int last_read = 0; + size_t read = 0; + size_t last_read = 0; do { last_read = data->stream->read(reinterpret_cast<uint8_t *>(data->buf), 8196 - read); read += last_read; - } while (data->processor->isRunning() && last_read > 0 && read < 8196); + } while (data->processor->isRunning() && last_read > 0 && last_read != static_cast<size_t>(-1) && read < 8196); if (!data->processor->isRunning()) { archive_set_error(a, EINTR, "Processor shut down during read"); return -1; } - return read; + return gsl::narrow<la_ssize_t>(read); } int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) { diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp index b83d028..bbb66a8 100644 --- a/extensions/librdkafka/PublishKafka.cpp +++ b/extensions/librdkafka/PublishKafka.cpp @@ -409,14 +409,13 @@ class ReadCallback : public InputStreamCallback { } for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) { - const int readRet = stream->read(buffer.data(), gsl::narrow<int>(buffer.size())); - if (readRet < 0) { + const auto readRet = stream->read(buffer.data(), buffer.size()); + if (readRet == static_cast<size_t>(-1)) { status_ = -1; error_ = "Failed to read from stream"; return read_size_; } - - if (readRet <= 0) { break; } + if (readRet == 0) { break; } const auto err = produce(segment_num, buffer, readRet); if (err) { diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h index 47f53d0..659af59 100644 --- a/extensions/mqtt/processors/ConvertJSONAck.h +++ b/extensions/mqtt/processors/ConvertJSONAck.h @@ -31,6 +31,8 @@ #include "MQTTClient.h" #include "c2/protocols/RESTProtocol.h" #include "ConvertBase.h" +#include "utils/gsl.h" + namespace org { namespace apache { namespace nifi { @@ -52,7 +54,7 @@ class ConvertJSONAck : public ConvertBase { logger_(logging::LoggerFactory<ConvertJSONAck>::getLogger()) { } // Destructor - virtual ~ConvertJSONAck() = default; + ~ConvertJSONAck() override = default; // Processor Name static constexpr char const* ProcessorName = "ConvertJSONAck"; @@ -72,14 +74,13 @@ class ConvertJSONAck : public ConvertBase { class ReadCallback : public InputStreamCallback { public: ReadCallback() = default; - ~ReadCallback() = default; - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { - int64_t ret = 0; + ~ReadCallback() override = default; + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { if (nullptr == stream) return 0; buffer_.resize(stream->size()); - ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size()); - return ret; + const auto ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size()); + return gsl::narrow<int64_t>(ret); } std::vector<char> buffer_; }; diff --git a/extensions/mqtt/processors/ConvertUpdate.cpp b/extensions/mqtt/processors/ConvertUpdate.cpp index ca5a8d5..69da1fc 100644 --- a/extensions/mqtt/processors/ConvertUpdate.cpp +++ b/extensions/mqtt/processors/ConvertUpdate.cpp @@ -37,7 +37,7 @@ void ConvertUpdate::onTrigger(const std::shared_ptr<core::ProcessContext> &conte bool received_update = false; while (mqtt_service_->get(100, listening_topic, update)) { // first we have the input topic string followed by the update URI - if (update.size() > 0) { + if (!update.empty()) { io::BufferStream stream(update.data(), update.size()); diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h index fdad09f..ba14c0f 100644 --- a/extensions/mqtt/processors/PublishMQTT.h +++ b/extensions/mqtt/processors/PublishMQTT.h @@ -20,6 +20,8 @@ #ifndef __PUBLISH_MQTT_H__ #define __PUBLISH_MQTT_H__ +#include <limits> + #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" @@ -50,7 +52,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor { max_seg_size_ = ULLONG_MAX; } // Destructor - virtual ~PublishMQTT() = default; + ~PublishMQTT() override = default; // Processor Name static constexpr char const* ProcessorName = "PublishMQTT"; // Supported Properties @@ -74,18 +76,20 @@ class PublishMQTT : public processors::AbstractMQTTProcessor { status_ = 0; read_size_ = 0; } - ~ReadCallback() = default; - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { + ~ReadCallback() override = default; + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { if (flow_size_ < max_seg_size_) max_seg_size_ = flow_size_; + gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max())); std::vector<unsigned char> buffer(max_seg_size_); read_size_ = 0; status_ = 0; while (read_size_ < flow_size_) { - int readRet = stream->read(&buffer[0], max_seg_size_); + // MQTTClient_message::payloadlen is int, so we can't handle 2GB+ + int readRet = static_cast<int>(stream->read(&buffer[0], max_seg_size_)); if (readRet < 0) { status_ = -1; - return read_size_; + return gsl::narrow<int64_t>(read_size_); } if (readRet > 0) { MQTTClient_message pubmsg = MQTTClient_message_initializer; @@ -97,12 +101,12 @@ class PublishMQTT : public processors::AbstractMQTTProcessor { status_ = -1; return -1; } - read_size_ += readRet; + read_size_ += gsl::narrow<size_t>(readRet); } else { break; } } - return read_size_; + return gsl::narrow<int64_t>(read_size_); } uint64_t flow_size_; uint64_t max_seg_size_; diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp index 1f3fe63..e613d1b 100644 --- a/extensions/opc/src/putopc.cpp +++ b/extensions/opc/src/putopc.cpp @@ -428,21 +428,15 @@ namespace processors { uint64_t size = 0; do { - int read = stream->read(buf_.data() + size, 1024); - - if (read < 0) { - return -1; - } - - if (read == 0) { - break; - } + const auto read = stream->read(buf_.data() + size, 1024); + if (read == static_cast<size_t>(-1)) return -1; + if (read == 0) break; size += read; } while (size < stream->size()); logger_->log_trace("Read %llu bytes from flowfile content to buffer", stream->size()); - return size; + return gsl::narrow<int64_t>(size); } } /* namespace processors */ diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp index 86daa20..27b342e 100644 --- a/extensions/rocksdb-repos/RocksDbStream.cpp +++ b/extensions/rocksdb-repos/RocksDbStream.cpp @@ -47,7 +47,7 @@ RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::R void RocksDbStream::close() { } -void RocksDbStream::seek(uint64_t /*offset*/) { +void RocksDbStream::seek(size_t /*offset*/) { // noop } @@ -84,28 +84,17 @@ int RocksDbStream::write(const uint8_t *value, int size) { } } -int RocksDbStream::read(uint8_t *buf, int buflen) { - gsl_Expects(buflen >= 0); - if (!exists_) { - return -1; - } - if (buflen == 0) { - return 0; - } - if (!IsNullOrEmpty(buf)) { - size_t amtToRead = gsl::narrow<size_t>(buflen); - if (offset_ >= value_.size()) { - return 0; - } - if (amtToRead > value_.size() - offset_) { - amtToRead = value_.size() - offset_; - } - std::memcpy(buf, value_.data() + offset_, amtToRead); - offset_ += amtToRead; - return gsl::narrow<int>(amtToRead); - } else { - return -1; - } +size_t RocksDbStream::read(uint8_t *buf, size_t buflen) { + // The check have to be in this order for RocksDBStreamTest "Read zero bytes" to succeed + if (!exists_) return static_cast<size_t>(-1); + if (buflen == 0) return 0; + if (IsNullOrEmpty(buf)) return static_cast<size_t>(-1); + if (offset_ >= value_.size()) return 0; + + const auto amtToRead = std::min(buflen, value_.size() - offset_); + std::memcpy(buf, value_.data() + offset_, amtToRead); + offset_ += amtToRead; + return amtToRead; } } /* namespace io */ diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h index 8e47b51..d051802 100644 --- a/extensions/rocksdb-repos/RocksDbStream.h +++ b/extensions/rocksdb-repos/RocksDbStream.h @@ -56,7 +56,7 @@ class RocksDbStream : public io::BaseStream { * Skip to the specified offset. * @param offset offset to which we will skip */ - void seek(uint64_t offset) override; + void seek(size_t offset) override; size_t size() const override { return size_; @@ -70,7 +70,7 @@ class RocksDbStream : public io::BaseStream { * @param buf buffer in which we extract data * @param buflen */ - int read(uint8_t *buf, int buflen) override; + size_t read(uint8_t *buf, size_t buflen) override; /** * writes value to stream diff --git a/extensions/script/python/PyBaseStream.cpp b/extensions/script/python/PyBaseStream.cpp index fcde480..1733994 100644 --- a/extensions/script/python/PyBaseStream.cpp +++ b/extensions/script/python/PyBaseStream.cpp @@ -49,13 +49,11 @@ py::bytes PyBaseStream::read(size_t len) { std::vector<uint8_t> buffer(len); - auto read = stream_->read(buffer.data(), static_cast<int>(len)); - auto result = py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read)); - - return result; + const auto read = stream_->read(buffer.data(), len); + return py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read)); } -size_t PyBaseStream::write(py::bytes buf) { +size_t PyBaseStream::write(const py::bytes& buf) { const auto &&buf_str = buf.operator std::string(); return static_cast<size_t>(stream_->write(reinterpret_cast<uint8_t *>(const_cast<char *>(buf_str.data())), static_cast<int>(buf_str.length()))); diff --git a/extensions/script/python/PyBaseStream.h b/extensions/script/python/PyBaseStream.h index f07fe80..1f11065 100644 --- a/extensions/script/python/PyBaseStream.h +++ b/extensions/script/python/PyBaseStream.h @@ -37,7 +37,7 @@ class PyBaseStream { py::bytes read(); py::bytes read(size_t len = 0); - size_t write(py::bytes buf); + size_t write(const py::bytes& buf); private: std::shared_ptr<io::BaseStream> stream_; diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp index e657d2e..8d528ab 100644 --- a/extensions/sftp/client/SFTPClient.cpp +++ b/extensions/sftp/client/SFTPClient.cpp @@ -562,12 +562,12 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov return true; } - const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min<size_t>(expected_size, MAX_BUFFER_SIZE); + const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min(gsl::narrow<size_t>(expected_size), MAX_BUFFER_SIZE); std::vector<uint8_t> buf(buf_size); uint64_t total_read = 0U; do { - int read_ret = input.read(buf.data(), buf.size()); - if (read_ret < 0) { + const auto read_ret = input.read(buf.data(), buf.size()); + if (read_ret == static_cast<size_t>(-1)) { last_error_.setLibssh2Error(LIBSSH2_FX_OK); logger_->log_error("Error while reading input"); return false; @@ -577,20 +577,20 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov } logger_->log_trace("Read %d bytes", read_ret); total_read += read_ret; - ssize_t remaining = read_ret; + auto remaining = gsl::narrow<size_t>(read_ret); while (remaining > 0) { - int write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining); + const auto write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining); if (write_ret < 0) { last_error_.setSftpError(SFTPError::IoError); logger_->log_error("Failed to write remote file \"%s\"", path.c_str()); return false; } logger_->log_trace("Wrote %d bytes to remote file \"%s\"", write_ret, path.c_str()); - remaining -= write_ret; + remaining -= gsl::narrow<size_t>(write_ret); } } while (true); - if (expected_size >= 0 && total_read != gsl::narrow<uint64_t>(expected_size)) { + if (expected_size >= 0 && total_read != gsl::narrow<size_t>(expected_size)) { last_error_.setLibssh2Error(LIBSSH2_FX_OK); logger_->log_error("Input has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read); return false; diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp index 345b84a..04e5e91 100644 --- a/extensions/standard-processors/processors/ExtractText.cpp +++ b/extensions/standard-processors/processors/ExtractText.cpp @@ -113,10 +113,9 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession } int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) { - int64_t ret = 0; - uint64_t read_size = 0; + size_t read_size = 0; bool regex_mode; - uint64_t size_limit = flowFile_->getSize(); + size_t size_limit = flowFile_->getSize(); std::string attrKey, sizeLimitStr; ctx_->getProperty(Attribute.getName(), attrKey); @@ -126,22 +125,22 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream> if (sizeLimitStr.empty()) size_limit = DEFAULT_SIZE_LIMIT; else if (sizeLimitStr != "0") - size_limit = std::stoi(sizeLimitStr); + size_limit = gsl::narrow_cast<size_t>(std::stoi(sizeLimitStr)); std::ostringstream contentStream; while (read_size < size_limit) { // Don't read more than config limit or the size of the buffer - int length = gsl::narrow<int>(std::min<uint64_t>(size_limit - read_size, buffer_.size())); - ret = stream->read(buffer_, length); + const auto length = std::min(size_limit - read_size, buffer_.size()); + const auto ret = stream->read(buffer_, length); - if (ret < 0) { + if (ret == static_cast<size_t>(-1)) { return -1; // Stream error } else if (ret == 0) { break; // End of stream, no more data } - contentStream.write(reinterpret_cast<const char*>(buffer_.data()), ret); + contentStream.write(reinterpret_cast<const char*>(buffer_.data()), gsl::narrow<std::streamsize>(ret)); read_size += ret; if (contentStream.fail()) { return -1; @@ -162,9 +161,8 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream> bool repeatingcapture; ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture); - int maxCaptureSizeProperty; - ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSizeProperty); - size_t maxCaptureSize = gsl::narrow<size_t>(maxCaptureSizeProperty); + size_t maxCaptureSize; + ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSize); std::string contentStr = contentStream.str(); @@ -212,7 +210,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream> } else { flowFile_->setAttribute(attrKey, contentStream.str()); } - return read_size; + return gsl::narrow<int64_t>(read_size); } ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx, std::shared_ptr<logging::Logger> lgr) diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h index 6d4b2ef..7a7d920 100644 --- a/extensions/standard-processors/processors/LogAttribute.h +++ b/extensions/standard-processors/processors/LogAttribute.h @@ -47,14 +47,14 @@ class LogAttribute : public core::Processor { * Create a new processor */ explicit LogAttribute(const std::string& name, const utils::Identifier& uuid = {}) - : Processor(std::move(name), uuid), + : Processor(name, uuid), flowfiles_to_log_(1), hexencode_(false), max_line_length_(80U), logger_(logging::LoggerFactory<LogAttribute>::getLogger()) { } // Destructor - virtual ~LogAttribute() = default; + ~LogAttribute() override = default; // Processor Name static constexpr char const* ProcessorName = "LogAttribute"; // Supported Properties @@ -103,16 +103,14 @@ class LogAttribute : public core::Processor { : logger_(std::move(logger)) , buffer_(size) { } - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { - if (buffer_.size() == 0U) { - return 0U; - } - int ret = stream->read(buffer_.data(), gsl::narrow<int>(buffer_.size())); - if (ret < 0 || static_cast<uint64_t>(ret) != buffer_.size()) { - logger_->log_error("%zu bytes were requested from the stream but %d bytes were read. Rolling back.", buffer_.size(), ret); + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + if (buffer_.empty()) return 0U; + const auto ret = stream->read(buffer_.data(), buffer_.size()); + if (ret != buffer_.size()) { + logger_->log_error("%zu bytes were requested from the stream but %zu bytes were read. Rolling back.", buffer_.size(), size_t{ret}); throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile."); } - return buffer_.size(); + return gsl::narrow<int64_t>(buffer_.size()); } std::shared_ptr<logging::Logger> logger_; std::vector<uint8_t> buffer_; @@ -123,7 +121,7 @@ class LogAttribute : public core::Processor { // OnTrigger method, implemented by NiFi LogAttribute void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; // Initialize, over write by NiFi LogAttribute - void initialize(void) override; + void initialize() override; private: uint64_t flowfiles_to_log_; diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp index 3765856..2060867 100644 --- a/extensions/standard-processors/processors/PutFile.cpp +++ b/extensions/standard-processors/processors/PutFile.cpp @@ -25,10 +25,12 @@ #include <memory> #include <string> #include <set> +#include <utility> #ifdef WIN32 #include <Windows.h> #endif #include "utils/file/FileUtils.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -304,9 +306,9 @@ void PutFile::getDirectoryPermissions(core::ProcessContext *context) { } #endif -PutFile::ReadCallback::ReadCallback(const std::string &tmp_file, const std::string &dest_file) - : tmp_file_(tmp_file), - dest_file_(dest_file) { +PutFile::ReadCallback::ReadCallback(std::string tmp_file, std::string dest_file) + : tmp_file_(std::move(tmp_file)), + dest_file_(std::move(dest_file)) { } // Copy the entire file contents to the temporary file @@ -319,17 +321,10 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st std::ofstream tmp_file_os(tmp_file_, std::ios::out | std::ios::binary); do { - int read = stream->read(buffer, 1024); - - if (read < 0) { - return -1; - } - - if (read == 0) { - break; - } - - tmp_file_os.write(reinterpret_cast<char *>(buffer), read); + const auto read = stream->read(buffer, 1024); + if (read == static_cast<size_t>(-1)) return -1; + if (read == 0) break; + tmp_file_os.write(reinterpret_cast<char *>(buffer), gsl::narrow<std::streamsize>(read)); size += read; } while (size < stream->size()); @@ -339,7 +334,7 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st write_succeeded_ = true; } - return size; + return gsl::narrow<int64_t>(size); } // Renames tmp file to final destination diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h index c09bffb..487213b 100644 --- a/extensions/standard-processors/processors/PutFile.h +++ b/extensions/standard-processors/processors/PutFile.h @@ -82,7 +82,7 @@ class PutFile : public core::Processor { class ReadCallback : public InputStreamCallback { public: - ReadCallback(const std::string &tmp_file, const std::string &dest_file); + ReadCallback(std::string tmp_file, std::string dest_file); ~ReadCallback() override; int64_t process(const std::shared_ptr<io::BaseStream>& stream) override; bool commit(); diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h index 75b2d4c..e33b3c5 100644 --- a/libminifi/include/io/AtomicEntryStream.h +++ b/libminifi/include/io/AtomicEntryStream.h @@ -64,7 +64,7 @@ class AtomicEntryStream : public BaseStream { * Skip to the specified offset. * @param offset offset to which we will skip */ - void seek(uint64_t offset) override; + void seek(size_t offset) override; size_t size() const override { return length_; @@ -75,7 +75,7 @@ class AtomicEntryStream : public BaseStream { * @param buf buffer in which we extract data * @param buflen */ - int read(uint8_t *buf, int buflen) override; + size_t read(uint8_t *buf, size_t buflen) override; /** * writes value to stream @@ -103,7 +103,7 @@ AtomicEntryStream<T>::~AtomicEntryStream() { } template<typename T> -void AtomicEntryStream<T>::seek(uint64_t offset) { +void AtomicEntryStream<T>::seek(size_t offset) { std::lock_guard<std::recursive_mutex> lock(entry_lock_); offset_ = gsl::narrow<size_t>(offset); } @@ -129,14 +129,13 @@ int AtomicEntryStream<T>::write(const uint8_t *value, int size) { } template<typename T> -int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) { - gsl_Expects(buflen >= 0); +size_t AtomicEntryStream<T>::read(uint8_t *buf, size_t buflen) { if (buflen == 0) { return 0; } if (nullptr != buf && !invalid_stream_) { std::lock_guard<std::recursive_mutex> lock(entry_lock_); - int len = buflen; + auto len = buflen; core::repository::RepoValue<T> *value; if (entry_->getValue(key_, &value)) { if (offset_ + len > value->getBufferSize()) { @@ -152,7 +151,7 @@ int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) { return len; } } - return -1; + return static_cast<size_t>(-1); } } // namespace io diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h index 3f0bfdc..7ab2b9e 100644 --- a/libminifi/include/io/BufferStream.h +++ b/libminifi/include/io/BufferStream.h @@ -35,7 +35,7 @@ class BufferStream : public BaseStream { public: BufferStream() = default; - BufferStream(const uint8_t *buf, const unsigned int len) { + BufferStream(const uint8_t *buf, const size_t len) { write(buf, len); } @@ -48,7 +48,7 @@ class BufferStream : public BaseStream { int write(const uint8_t* data, int len) final; - int read(uint8_t* buffer, int len) override; + size_t read(uint8_t* buffer, size_t len) override; int initialize() override { buffer_.clear(); @@ -56,7 +56,7 @@ class BufferStream : public BaseStream { return 0; } - void seek(uint64_t offset) override { + void seek(size_t offset) override { readOffset_ += offset; } diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h index 2204537..abe0762 100644 --- a/libminifi/include/io/CRCStream.h +++ b/libminifi/include/io/CRCStream.h @@ -84,9 +84,9 @@ class InputCRCStream : public virtual CRCStreamBase<StreamType>, public InputStr public: using InputStream::read; - int read(uint8_t *buf, int buflen) override { - int ret = child_stream_->read(buf, buflen); - if (ret > 0) { + size_t read(uint8_t *buf, size_t buflen) override { + const auto ret = child_stream_->read(buf, buflen); + if (ret > 0 && ret != static_cast<size_t>(-1)) { crc_ = crc32(crc_, buf, ret); } return ret; diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h index 37626c8..398a1d5 100644 --- a/libminifi/include/io/ClientSocket.h +++ b/libminifi/include/io/ClientSocket.h @@ -159,7 +159,7 @@ class Socket : public BaseStream { * @param buflen * @param retrieve_all_bytes determines if we should read all bytes before returning */ - int read(uint8_t *buf, int buflen) override { + size_t read(uint8_t *buf, size_t buflen) override { return read(buf, buflen, true); } @@ -169,7 +169,7 @@ class Socket : public BaseStream { * @param buflen * @param retrieve_all_bytes determines if we should read all bytes before returning */ - virtual int read(uint8_t *buf, int buflen, bool retrieve_all_bytes); + virtual size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes); protected: /** diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h index a7ea713..ee5aec7 100644 --- a/libminifi/include/io/DescriptorStream.h +++ b/libminifi/include/io/DescriptorStream.h @@ -52,14 +52,14 @@ class DescriptorStream : public io::BaseStream { * Skip to the specified offset. * @param offset offset to which we will skip */ - void seek(uint64_t offset) override; + void seek(size_t offset) override; /** * Reads data and places it into buf * @param buf buffer in which we extract data * @param buflen */ - int read(uint8_t *buf, int buflen) override; + size_t read(uint8_t *buf, size_t buflen) override; /** * writes value to stream diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h index 3156a30..ad7f0dc 100644 --- a/libminifi/include/io/FileStream.h +++ b/libminifi/include/io/FileStream.h @@ -63,7 +63,7 @@ class FileStream : public io::BaseStream { * Skip to the specified offset. * @param offset offset to which we will skip */ - void seek(uint64_t offset) override; + void seek(size_t offset) override; size_t size() const override { return length_; @@ -77,7 +77,7 @@ class FileStream : public io::BaseStream { * @param buf buffer in which we extract data * @param buflen */ - int read(uint8_t *buf, int buflen) override; + size_t read(uint8_t *buf, size_t buflen) override; /** * writes value to stream diff --git a/libminifi/include/io/InputStream.h b/libminifi/include/io/InputStream.h index 60a297c..cebf266 100644 --- a/libminifi/include/io/InputStream.h +++ b/libminifi/include/io/InputStream.h @@ -39,32 +39,32 @@ class InputStream : public virtual Stream { * reads a byte array from the stream * @param value reference in which will set the result * @param len length to read - * @return resulting read size + * @return resulting read size or static_cast<size_t>(-1) on error or static_cast<size_t>(-2) for ClientSocket EAGAIN **/ - virtual int read(uint8_t *value, int len) = 0; + virtual size_t read(uint8_t *value, size_t len) = 0; - int read(std::vector<uint8_t>& buffer, int len); + size_t read(std::vector<uint8_t>& buffer, size_t len); /** * read string from stream * @param str reference string * @return resulting read size **/ - int read(std::string &str, bool widen = false); + size_t read(std::string &str, bool widen = false); /** * read a bool from stream * @param value reference to the output * @return resulting read size **/ - int read(bool& value); + size_t read(bool& value); /** * read a uuid from stream * @param value reference to the output * @return resulting read size **/ - int read(utils::Identifier& value); + size_t read(utils::Identifier& value); /** * reads sizeof(Integral) bytes from the stream @@ -72,10 +72,10 @@ class InputStream : public virtual Stream { * @return resulting read size **/ template<typename Integral, typename = std::enable_if<std::is_unsigned<Integral>::value && !std::is_same<Integral, bool>::value>> - int read(Integral& value) { + size_t read(Integral& value) { uint8_t buf[sizeof(Integral)]{}; if (read(buf, sizeof(Integral)) != sizeof(Integral)) { - return -1; + return static_cast<size_t>(-1); } value = 0; diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h index e067b52..cb528e5 100644 --- a/libminifi/include/io/Stream.h +++ b/libminifi/include/io/Stream.h @@ -31,7 +31,7 @@ class Stream { public: virtual void close() {} - virtual void seek(uint64_t /*offset*/) { + virtual void seek(size_t /*offset*/) { throw std::runtime_error("Seek is not supported"); } diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h index b3fafce..8c88568 100644 --- a/libminifi/include/io/StreamPipe.h +++ b/libminifi/include/io/StreamPipe.h @@ -49,14 +49,10 @@ inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shar uint8_t buffer[4096U]; int64_t totalTransferred = 0; while (true) { - int readRet = src->read(buffer, sizeof(buffer)); - if (readRet < 0) { - return readRet; - } - if (readRet == 0) { - break; - } - int remaining = readRet; + const auto readRet = src->read(buffer, sizeof(buffer)); + if (readRet == static_cast<size_t>(-1)) return -1; + if (readRet == 0) break; + auto remaining = readRet; int transferred = 0; while (remaining > 0) { int writeRet = dst->write(buffer + transferred, remaining); diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h b/libminifi/include/io/tls/SecureDescriptorStream.h index 7699910..5f285c1 100644 --- a/libminifi/include/io/tls/SecureDescriptorStream.h +++ b/libminifi/include/io/tls/SecureDescriptorStream.h @@ -63,7 +63,7 @@ class SecureDescriptorStream : public io::BaseStream { * Skip to the specified offset. * @param offset offset to which we will skip */ - void seek(uint64_t offset) override; + void seek(size_t offset) override; size_t size() const override { return -1; @@ -74,7 +74,7 @@ class SecureDescriptorStream : public io::BaseStream { * @param buf buffer in which we extract data * @param buflen */ - int read(uint8_t *buf, int buflen) override; + size_t read(uint8_t *buf, size_t buflen) override; /** * writes value to stream diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h index 6bf741d..24df1d9 100644 --- a/libminifi/include/io/tls/TLSSocket.h +++ b/libminifi/include/io/tls/TLSSocket.h @@ -136,14 +136,14 @@ class TLSSocket : public Socket { using Socket::read; using Socket::write; - int read(uint8_t *buf, int buflen, bool retrieve_all_bytes) override; + size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) override; /** * Reads data and places it into buf * @param buf buffer in which we extract data * @param buflen */ - int read(uint8_t *buf, int buflen) override; + size_t read(uint8_t *buf, size_t buflen) override; /** * Write value to the stream using uint8_t ptr diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h index 3b7b4f7..60ee8c2 100644 --- a/libminifi/include/provenance/Provenance.h +++ b/libminifi/include/provenance/Provenance.h @@ -363,25 +363,23 @@ class ProvenanceEventRecord : public core::SerializableComponent { uint64_t getEventTime(const uint8_t *buffer, const size_t bufferSize) { const auto size = std::min<size_t>(72, bufferSize); - org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<int>(size)); + org::apache::nifi::minifi::io::BufferStream outStream(buffer, size); std::string uuid; - int ret = outStream.read(uuid); - - if (ret <= 0) { + const auto uuidret = outStream.read(uuid); + if (uuidret <= 0) { return 0; } uint32_t eventType; - ret = outStream.read(eventType); - if (ret != 4) { + const auto typeret = outStream.read(eventType); + if (typeret != 4) { return 0; } uint64_t event_time; - - ret = outStream.read(event_time); - if (ret != 8) { + const auto timeret = outStream.read(event_time); + if (timeret != 8) { return 0; } diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h index a82e3b4..d2138ea 100644 --- a/libminifi/include/sitetosite/Peer.h +++ b/libminifi/include/sitetosite/Peer.h @@ -299,7 +299,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { return stream_->write(data, len); } - int read(uint8_t* data, int len) override { + size_t read(uint8_t* data, size_t len) override { return stream_->read(data, len); } diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h index fc1dc91..7a41e81 100644 --- a/libminifi/include/sitetosite/SiteToSiteClient.h +++ b/libminifi/include/sitetosite/SiteToSiteClient.h @@ -19,6 +19,7 @@ #ifndef LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_ #define LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_ +#include <algorithm> #include <map> #include <memory> #include <string> @@ -30,6 +31,7 @@ #include "core/ProcessSession.h" #include "core/ProcessContext.h" #include "core/Connectable.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -65,11 +67,6 @@ class SiteToSiteClient : public core::Connectable { _batchSendNanos(5000000000), ssl_context_service_(nullptr), logger_(logging::LoggerFactory<SiteToSiteClient>::getLogger()) { - _supportedVersion[0] = 5; - _supportedVersion[1] = 4; - _supportedVersion[2] = 3; - _supportedVersion[3] = 2; - _supportedVersion[4] = 1; _currentVersion = _supportedVersion[0]; _currentVersionIndex = 0; _supportedCodecVersion[0] = 1; @@ -77,7 +74,7 @@ class SiteToSiteClient : public core::Connectable { _currentCodecVersionIndex = 0; } - virtual ~SiteToSiteClient() = default; + ~SiteToSiteClient() override = default; void setSSLContextService(const std::shared_ptr<minifi::controllers::SSLContextService> &context_service) { ssl_context_service_ = context_service; @@ -189,13 +186,13 @@ class SiteToSiteClient : public core::Connectable { return logger_; } - virtual void yield() { + void yield() override { } /** * Determines if we are connected and operating */ - virtual bool isRunning() { + bool isRunning() override { return running_; } @@ -203,7 +200,7 @@ class SiteToSiteClient : public core::Connectable { * Determines if work is available by this connectable * @return boolean if work is available. */ - virtual bool isWorkAvailable() { + bool isWorkAvailable() override { return true; } @@ -234,12 +231,12 @@ class SiteToSiteClient : public core::Connectable { virtual int writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message); // getRespondCodeContext virtual RespondCodeContext *getRespondCodeContext(RespondCode code) { - for (unsigned int i = 0; i < sizeof(SiteToSiteRequest::respondCodeContext) / sizeof(RespondCodeContext); i++) { - if (SiteToSiteRequest::respondCodeContext[i].code == code) { - return &SiteToSiteRequest::respondCodeContext[i]; + for (auto & i : SiteToSiteRequest::respondCodeContext) { + if (i.code == code) { + return &i; } } - return NULL; + return nullptr; } // Peer State @@ -254,7 +251,7 @@ class SiteToSiteClient : public core::Connectable { // Peer Connection std::unique_ptr<SiteToSitePeer> peer_; - std::atomic<bool> running_; + std::atomic<bool> running_{false}; // transaction map std::map<utils::Identifier, std::shared_ptr<Transaction>> known_transactions_; @@ -265,10 +262,10 @@ class SiteToSiteClient : public core::Connectable { /*** * versioning */ - uint32_t _supportedVersion[5]; + uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1}; uint32_t _currentVersion; int _currentVersionIndex; - uint32_t _supportedCodecVersion[1]; + uint32_t _supportedCodecVersion[1] = {1}; uint32_t _currentCodecVersion; int _currentCodecVersionIndex; @@ -286,13 +283,13 @@ class WriteCallback : public OutputStreamCallback { } DataPacket *_packet; // void process(std::ofstream *stream) { - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { uint8_t buffer[16384]; uint64_t len = _packet->_size; uint64_t total = 0; while (len > 0) { - int size = len < 16384 ? static_cast<int>(len) : 16384; - int ret = _packet->transaction_->getStream().read(buffer, size); + const auto size = std::min(len, size_t{16384}); + const auto ret = _packet->transaction_->getStream().read(buffer, size); if (ret != size) { logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len; return -1; @@ -302,7 +299,7 @@ class WriteCallback : public OutputStreamCallback { total += size; } logging::LOG_INFO(_packet->logger_reference_) << "Received " << total << " from stream"; - return len; + return gsl::narrow<int64_t>(len); } }; // Nest Callback Class for read stream @@ -312,21 +309,15 @@ class ReadCallback : public InputStreamCallback { : _packet(packet) { } DataPacket *_packet; - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { _packet->_size = 0; uint8_t buffer[8192] = { 0 }; - int readSize; size_t size = 0; do { - readSize = stream->read(buffer, 8192); - - if (readSize == 0) { - break; - } - if (readSize < 0) { - return -1; - } - int ret = _packet->transaction_->getStream().write(buffer, readSize); + const auto readSize = stream->read(buffer, 8192); + if (readSize == 0) break; + if (readSize == static_cast<size_t>(-1)) return -1; + const auto ret = _packet->transaction_->getStream().write(buffer, readSize); if (ret != readSize) { logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret; return -1; @@ -334,7 +325,7 @@ class ReadCallback : public InputStreamCallback { size += readSize; } while (size < stream->size()); _packet->_size = size; - return size; + return gsl::narrow<int64_t>(size); } }; diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h index 84e3362..ace72b6 100644 --- a/libminifi/include/utils/ByteArrayCallback.h +++ b/libminifi/include/utils/ByteArrayCallback.h @@ -24,6 +24,7 @@ #include "concurrentqueue.h" #include "FlowFileRecord.h" #include "core/logging/LoggerConfiguration.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -45,10 +46,10 @@ class ByteInputCallBack : public InputStreamCallback { if (stream->size() > 0) { vec.resize(stream->size()); - stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size())); + stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size()); } - return vec.size(); + return gsl::narrow<int64_t>(vec.size()); } virtual void seek(size_t) { } @@ -101,7 +102,7 @@ class ByteOutputCallback : public OutputStreamCallback { virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream); - virtual const std::vector<char> to_string(); + virtual std::vector<char> to_string(); virtual void close(); diff --git a/libminifi/include/utils/Enum.h b/libminifi/include/utils/Enum.h index 440b158..00facc4 100644 --- a/libminifi/include/utils/Enum.h +++ b/libminifi/include/utils/Enum.h @@ -72,8 +72,8 @@ namespace utils { #define SMART_ENUM_BODY(Clazz, ...) \ constexpr Clazz(Type value = static_cast<Type>(-1)) : value_{value} {} \ - Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \ - Clazz(const char* str) : value_{parse(str).value_} {} \ + explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \ + explicit Clazz(const char* str) : value_{parse(str).value_} {} \ private: \ Type value_; \ public: \ diff --git a/libminifi/include/utils/FileOutputCallback.h b/libminifi/include/utils/FileOutputCallback.h index 1343fae..ee68db5 100644 --- a/libminifi/include/utils/FileOutputCallback.h +++ b/libminifi/include/utils/FileOutputCallback.h @@ -51,7 +51,7 @@ class FileOutputCallback : public ByteOutputCallback { int64_t process(const std::shared_ptr<io::BaseStream>& stream) override; - const std::vector<char> to_string() override; + std::vector<char> to_string() override; void close() override; diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index 813137b..047c571 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -161,11 +161,9 @@ bool FlowFileRecord::Persist(const std::shared_ptr<core::Repository>& flowReposi } std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inStream, const std::shared_ptr<core::ContentRepository>& content_repo, utils::Identifier& container) { - int ret; - auto file = std::make_shared<FlowFileRecord>(); - ret = inStream.read(file->event_time_); + auto ret = inStream.read(file->event_time_); if (ret != 8) { return {}; } @@ -181,12 +179,12 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS } ret = inStream.read(file->uuid_); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return {}; } ret = inStream.read(container); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return {}; } @@ -200,12 +198,12 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS for (uint32_t i = 0; i < numAttributes; i++) { std::string key; ret = inStream.read(key, true); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return {}; } std::string value; ret = inStream.read(value, true); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return {}; } file->attributes_[key] = value; @@ -213,7 +211,7 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS std::string content_full_path; ret = inStream.read(content_full_path); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return {}; } diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp index 8322302..73b74ff 100644 --- a/libminifi/src/c2/ControllerSocketProtocol.cpp +++ b/libminifi/src/c2/ControllerSocketProtocol.cpp @@ -101,8 +101,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro case Operation::START: { std::string componentStr; - int size = stream->read(componentStr); - if ( size != -1 ) { + const auto size = stream->read(componentStr); + if ( size != static_cast<size_t>(-1) ) { auto components = update_sink_->getComponents(componentStr); for (const auto& component : components) { component->start(); @@ -115,8 +115,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro case Operation::STOP: { std::string componentStr; - int size = stream->read(componentStr); - if ( size != -1 ) { + const auto size = stream->read(componentStr); + if ( size != static_cast<size_t>(-1) ) { auto components = update_sink_->getComponents(componentStr); for (const auto& component : components) { component->stop(); @@ -129,8 +129,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro case Operation::CLEAR: { std::string connection; - int size = stream->read(connection); - if ( size != -1 ) { + const auto size = stream->read(connection); + if ( size != static_cast<size_t>(-1) ) { update_sink_->clearConnection(connection); } } @@ -138,18 +138,18 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro case Operation::UPDATE: { std::string what; - int size = stream->read(what); - if (size == -1) { + const auto size = stream->read(what); + if (size == static_cast<size_t>(-1)) { logger_->log_debug("Connection broke"); break; } if (what == "flow") { std::string ff_loc; - int size = stream->read(ff_loc); + const auto size = stream->read(ff_loc); std::ifstream tf(ff_loc); std::string configuration((std::istreambuf_iterator<char>(tf)), std::istreambuf_iterator<char>()); - if (size == -1) { + if (size == static_cast<size_t>(-1)) { logger_->log_debug("Connection broke"); break; } @@ -160,15 +160,15 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro case Operation::DESCRIBE: { std::string what; - int size = stream->read(what); - if (size == -1) { + const auto size = stream->read(what); + if (size == static_cast<size_t>(-1)) { logger_->log_debug("Connection broke"); break; } if (what == "queue") { std::string connection; - int size = stream->read(connection); - if (size == -1) { + const auto size_ = stream->read(connection); + if (size_ == static_cast<size_t>(-1)) { logger_->log_debug("Connection broke"); break; } @@ -184,8 +184,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro } else if (what == "components") { io::BufferStream resp; resp.write(&head, 1); - uint16_t size = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size()); - resp.write(size); + const auto size_ = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size()); + resp.write(size_); for (const auto &component : update_sink_->getAllComponents()) { resp.write(component->getComponentName()); resp.write(component->isRunning() ? "true" : "false"); @@ -210,8 +210,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro } else if (what == "connections") { io::BufferStream resp; resp.write(&head, 1); - uint16_t size = gsl::narrow<uint16_t>(queue_full_.size()); - resp.write(size); + const auto size_ = gsl::narrow<uint16_t>(queue_full_.size()); + resp.write(size_); for (const auto &connection : queue_full_) { resp.write(connection.first, false); } @@ -220,17 +220,17 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro std::vector<std::string> full_connections; { std::lock_guard<std::mutex> lock(controller_mutex_); - for (auto conn : queue_full_) { - if (conn.second == true) { + for (const auto& conn : queue_full_) { + if (conn.second) { full_connections.push_back(conn.first); } } } io::BufferStream resp; resp.write(&head, 1); - uint16_t full_connection_count = gsl::narrow<uint16_t>(full_connections.size()); + const auto full_connection_count = gsl::narrow<uint16_t>(full_connections.size()); resp.write(full_connection_count); - for (auto conn : full_connections) { + for (const auto& conn : full_connections) { resp.write(conn); } stream->write(const_cast<uint8_t*>(resp.getBuffer()), gsl::narrow<int>(resp.size())); @@ -248,11 +248,11 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse> &content) { for (const auto &payload_content : content) { if (payload_content.name == "Components") { - for (auto content : payload_content.operation_arguments) { + for (const auto& content_ : payload_content.operation_arguments) { bool is_enabled = false; - minifi::utils::StringUtils::StringToBool(content.second.to_string(), is_enabled); + minifi::utils::StringUtils::StringToBool(content_.second.to_string(), is_enabled); std::lock_guard<std::mutex> lock(controller_mutex_); - component_map_[content.first] = is_enabled; + component_map_[content_.first] = is_enabled; } } } @@ -261,21 +261,20 @@ void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) { if (server_socket_ == nullptr) return 0; - const std::vector<C2ContentResponse> &content = payload.getContent(); for (const auto &pc : payload.getNestedPayloads()) { if (pc.getLabel() == "flowInfo" || pc.getLabel() == "metrics") { for (const auto &metrics_payload : pc.getNestedPayloads()) { if (metrics_payload.getLabel() == "QueueMetrics" || metrics_payload.getLabel() == "queues") { for (const auto &queue_metrics : metrics_payload.getNestedPayloads()) { - auto metric_content = queue_metrics.getContent(); - for (const auto &payload_content : queue_metrics.getContent()) { + const auto& metric_content = queue_metrics.getContent(); + for (const auto &payload_content : metric_content) { uint64_t size = 0; uint64_t max = 0; - for (auto content : payload_content.operation_arguments) { + for (const auto& content : payload_content.operation_arguments) { if (content.first == "datasize") { - size = std::stol(content.second.to_string()); + size = std::stoull(content.second.to_string()); } else if (content.first == "datasizemax") { - max = std::stol(content.second.to_string()); + max = std::stoull(content.second.to_string()); } } std::lock_guard<std::mutex> lock(controller_mutex_); @@ -293,7 +292,7 @@ int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) { } } - parse_content(content); + parse_content(payload.getContent()); std::vector<uint8_t> buffer; buffer.resize(1024); diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index f8e8c31..6a4077e 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -325,8 +325,8 @@ void ProcessSession::importFrom(io::InputStream&& stream, const std::shared_ptr< * */ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<core::FlowFile> &flow) { - std::shared_ptr<ResourceClaim> claim = content_session_->create(); - int max_read = getpagesize(); + const std::shared_ptr<ResourceClaim> claim = content_session_->create(); + const auto max_read = gsl::narrow_cast<size_t>(getpagesize()); std::vector<uint8_t> charBuffer(max_read); try { @@ -336,10 +336,10 @@ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<c if (nullptr == content_stream) { throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath()); } - int position = 0; - const int max_size = gsl::narrow<int>(stream.size()); + size_t position = 0; + const auto max_size = stream.size(); while (position < max_size) { - const int read_size = std::min(max_read, max_size - position); + const auto read_size = std::min(max_read, max_size - position); stream.read(charBuffer, read_size); content_stream->write(charBuffer.data(), read_size); diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp index 8a21f73..d62fac8 100644 --- a/libminifi/src/core/ProcessSessionReadCallback.cpp +++ b/libminifi/src/core/ProcessSessionReadCallback.cpp @@ -24,6 +24,7 @@ #include "core/logging/LoggerConfiguration.h" #include "io/BaseStream.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -47,20 +48,16 @@ int64_t ProcessSessionReadCallback::process(const std::shared_ptr<io::BaseStream size_t size = 0; uint8_t buffer[8192]; do { - int read = stream->read(buffer, 8192); - if (read < 0) { - return -1; - } - if (read == 0) { - break; - } + const auto read = stream->read(buffer, 8192); + if (read == static_cast<size_t>(-1)) return -1; + if (read == 0) break; if (!_tmpFileOs.write(reinterpret_cast<char*>(buffer), read)) { return -1; } size += read; } while (size < stream->size()); _writeSucceeded = true; - return size; + return gsl::narrow<int64_t>(size); } // Renames tmp file to final destination diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp index 6d3ebab..12220ef 100644 --- a/libminifi/src/io/BufferStream.cpp +++ b/libminifi/src/io/BufferStream.cpp @@ -36,17 +36,16 @@ int BufferStream::write(const uint8_t *value, int size) { return size; } -int BufferStream::read(uint8_t *buf, int len) { - gsl_Expects(len >= 0); - int bytes_available_in_buffer = gsl::narrow<int>(buffer_.size() - readOffset_); - len = std::min(len, bytes_available_in_buffer); - auto begin = buffer_.begin() + readOffset_; - std::copy(begin, begin + len, buf); +size_t BufferStream::read(uint8_t *buf, size_t len) { + const auto bytes_available_in_buffer = buffer_.size() - readOffset_; + const auto readlen = std::min(len, bytes_available_in_buffer); + auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_); + std::copy(begin, begin + gsl::narrow<decltype(buffer_)::difference_type>(readlen), buf); // increase offset for the next read - readOffset_ += len; + readOffset_ += readlen; - return len; + return readlen; } } /* namespace io */ diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index 6427447..cc65a8b 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -47,6 +47,8 @@ #include "core/logging/LoggerConfiguration.h" #include "utils/file/FileUtils.h" #include "utils/GeneralUtils.h" +#include "utils/gsl.h" + namespace util = org::apache::nifi::minifi::utils; namespace mio = org::apache::nifi::minifi::io; @@ -523,9 +525,8 @@ int Socket::write(const uint8_t *value, int size) { return bytes; } -int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) { - gsl_Expects(buflen >= 0); - int32_t total_read = 0; +size_t Socket::read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) { + size_t total_read = 0; while (buflen) { int16_t fd = select_descriptor(1000); if (fd < 0) { @@ -533,9 +534,9 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) { logger_->log_debug("fd %d close %i", fd, buflen); utils::file::FileUtils::close(socket_file_descriptor_); } - return -1; + return static_cast<size_t>(-1); } - int bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0); + const auto bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0); logger_->log_trace("Recv call %d", bytes_read); if (bytes_read <= 0) { if (bytes_read == 0) { @@ -545,23 +546,23 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) { int err = WSAGetLastError(); if (err == WSAEWOULDBLOCK) { // continue - return -2; + return static_cast<size_t>(-2); } logger_->log_error("Could not recv on %d (port %d), error code: %d", fd, port_, err); #else if (errno == EAGAIN || errno == EWOULDBLOCK) { // continue - return -2; + return static_cast<size_t>(-2); } logger_->log_error("Could not recv on %d (port %d), error: %s", fd, port_, strerror(errno)); #endif // WIN32 } - return -1; + return static_cast<size_t>(-1); } - buflen -= bytes_read; + buflen -= gsl::narrow<size_t>(bytes_read); buf += bytes_read; - total_read += bytes_read; + total_read += gsl::narrow<size_t>(bytes_read); if (!retrieve_all_bytes) { break; } diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp index 2b5fc70..6330b5e 100644 --- a/libminifi/src/io/DescriptorStream.cpp +++ b/libminifi/src/io/DescriptorStream.cpp @@ -40,7 +40,7 @@ DescriptorStream::DescriptorStream(int fd) logger_(logging::LoggerFactory<DescriptorStream>::getLogger()) { } -void DescriptorStream::seek(uint64_t offset) { +void DescriptorStream::seek(size_t offset) { std::lock_guard<std::recursive_mutex> lock(file_lock_); #ifdef WIN32 _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00); @@ -70,25 +70,24 @@ int DescriptorStream::write(const uint8_t *value, int size) { } } -int DescriptorStream::read(uint8_t *buf, int buflen) { - gsl_Expects(buflen >= 0); +size_t DescriptorStream::read(uint8_t *buf, size_t buflen) { if (buflen == 0) { return 0; } if (!IsNullOrEmpty(buf)) { #ifdef WIN32 - auto size_read = _read(fd_, buf, buflen); + const auto size_read = _read(fd_, buf, buflen); #else - auto size_read = ::read(fd_, buf, buflen); + const auto size_read = ::read(fd_, buf, buflen); #endif if (size_read < 0) { - return -1; + return static_cast<size_t>(-1); } - return size_read; + return gsl::narrow<size_t>(size_read); } else { - return -1; + return static_cast<size_t>(-1); } } diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp index 4f759ae..c88f8dd 100644 --- a/libminifi/src/io/FileStream.cpp +++ b/libminifi/src/io/FileStream.cpp @@ -97,13 +97,13 @@ void FileStream::close() { file_stream_.reset(); } -void FileStream::seek(uint64_t offset) { +void FileStream::seek(size_t offset) { std::lock_guard<std::mutex> lock(file_lock_); if (file_stream_ == nullptr || !file_stream_->is_open()) { logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG; return; } - offset_ = gsl::narrow<size_t>(offset); + offset_ = offset; file_stream_->clear(); if (!file_stream_->seekg(offset_)) logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << SEEKG_CALL_ERROR_MSG; @@ -142,8 +142,7 @@ int FileStream::write(const uint8_t *value, int size) { } } -int FileStream::read(uint8_t *buf, int buflen) { - gsl_Expects(buflen >= 0); +size_t FileStream::read(uint8_t *buf, size_t buflen) { if (buflen == 0) { return 0; } @@ -151,32 +150,31 @@ int FileStream::read(uint8_t *buf, int buflen) { std::lock_guard<std::mutex> lock(file_lock_); if (file_stream_ == nullptr || !file_stream_->is_open()) { logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG; - return -1; + return static_cast<size_t>(-1); } - file_stream_->read(reinterpret_cast<char*>(buf), buflen); + file_stream_->read(reinterpret_cast<char*>(buf), gsl::narrow<std::streamsize>(buflen)); if (file_stream_->eof() || file_stream_->fail()) { file_stream_->clear(); seekToEndOfFile(READ_ERROR_MSG); auto tellg_result = file_stream_->tellg(); if (tellg_result == std::streampos(-1)) { logging::LOG_ERROR(logger_) << READ_ERROR_MSG << TELLG_CALL_ERROR_MSG; - return -1; + return static_cast<size_t>(-1); } - size_t len = gsl::narrow<size_t>(tellg_result); + const auto len = gsl::narrow<size_t>(tellg_result); size_t ret = len - offset_; offset_ = len; length_ = len; logging::LOG_DEBUG(logger_) << path_ << " eof bit, ended at " << offset_; - return gsl::narrow<int>(ret); + return ret; } else { offset_ += buflen; - file_stream_->seekp(offset_); + file_stream_->seekp(gsl::narrow<std::streamoff>(offset_)); return buflen; } - } else { logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_BUFFER_ERROR_MSG; - return -1; + return static_cast<size_t>(-1); } } diff --git a/libminifi/src/io/InputStream.cpp b/libminifi/src/io/InputStream.cpp index cdd101e..cd4166b 100644 --- a/libminifi/src/io/InputStream.cpp +++ b/libminifi/src/io/InputStream.cpp @@ -30,42 +30,43 @@ namespace nifi { namespace minifi { namespace io { -int InputStream::read(std::vector<uint8_t>& buffer, int len) { - if (buffer.size() < gsl::narrow<size_t>(len)) { +size_t InputStream::read(std::vector<uint8_t>& buffer, size_t len) { + if (buffer.size() < len) { buffer.resize(len); } - int ret = read(buffer.data(), len); - buffer.resize((std::max)(ret, 0)); + const auto ret = read(buffer.data(), len); + if (ret == static_cast<size_t>(-1)) return ret; + buffer.resize((std::max)(ret, size_t{0})); return ret; } -int InputStream::read(bool &value) { +size_t InputStream::read(bool &value) { uint8_t buf = 0; if (read(&buf, 1) != 1) { - return -1; + return static_cast<size_t>(-1); } value = buf; return 1; } -int InputStream::read(utils::Identifier &value) { +size_t InputStream::read(utils::Identifier &value) { std::string uuidStr; - int ret = read(uuidStr); + const auto ret = read(uuidStr); if (ret < 0) { return ret; } auto optional_uuid = utils::Identifier::parse(uuidStr); if (!optional_uuid) { - return -1; + return static_cast<size_t>(-1); } value = optional_uuid.value(); return ret; } -int InputStream::read(std::string &str, bool widen) { +size_t InputStream::read(std::string &str, bool widen) { uint32_t len = 0; - int ret = 0; + size_t ret = 0; if (!widen) { uint16_t shortLength = 0; ret = read(shortLength); @@ -84,9 +85,9 @@ int InputStream::read(std::string &str, bool widen) { } std::vector<uint8_t> buffer(len); - uint32_t bytes_read = gsl::narrow<uint32_t>(read(buffer.data(), len)); + const auto bytes_read = gsl::narrow<uint32_t>(read(buffer.data(), len)); if (bytes_read != len) { - return -1; + return static_cast<size_t>(-1); } str = std::string(reinterpret_cast<const char*>(buffer.data()), len); diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp index c5b0b3f..66d23b5 100644 --- a/libminifi/src/io/tls/SecureDescriptorStream.cpp +++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp @@ -34,7 +34,7 @@ SecureDescriptorStream::SecureDescriptorStream(int fd, SSL *ssl) logger_(logging::LoggerFactory<SecureDescriptorStream>::getLogger()) { } -void SecureDescriptorStream::seek(uint64_t offset) { +void SecureDescriptorStream::seek(size_t offset) { std::lock_guard<std::recursive_mutex> lock(file_lock_); #ifdef WIN32 _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00); @@ -71,34 +71,29 @@ int SecureDescriptorStream::write(const uint8_t *value, int size) { } } -int SecureDescriptorStream::read(uint8_t *buf, int buflen) { - gsl_Expects(buflen >= 0); +size_t SecureDescriptorStream::read(uint8_t * const buf, const size_t buflen) { if (buflen == 0) { return 0; } - if (!IsNullOrEmpty(buf)) { - int total_read = 0; - int status = 0; - while (buflen) { - int sslStatus; - do { - status = SSL_read(ssl_, buf, buflen); - sslStatus = SSL_get_error(ssl_, status); - } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ); + if (IsNullOrEmpty(buf)) return static_cast<size_t>(-1); + size_t total_read = 0; + uint8_t* writepos = buf; + while (buflen > total_read) { + int status; + int sslStatus; + do { + status = SSL_read(ssl_, writepos, gsl::narrow<int>(std::min(buflen - total_read, gsl::narrow<size_t>(std::numeric_limits<int>::max())))); + sslStatus = SSL_get_error(ssl_, status); + } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ); - if (status < 0) - break; + if (status < 0) + break; - buflen -= status; - buf += status; - total_read += status; - } - - return total_read; - - } else { - return -1; + writepos += status; + total_read += gsl::narrow<size_t>(status); } + + return total_read; } } /* namespace io */ diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp index 776fa44..49d07ec 100644 --- a/libminifi/src/io/tls/TLSSocket.cpp +++ b/libminifi/src/io/tls/TLSSocket.cpp @@ -351,36 +351,37 @@ int16_t TLSSocket::select_descriptor(const uint16_t msec) { return -1; } -int TLSSocket::read(uint8_t *buf, int buflen, bool /*retrieve_all_bytes*/) { - gsl_Expects(buflen >= 0); - int total_read = 0; +size_t TLSSocket::read(uint8_t *buf, size_t buflen, bool /*retrieve_all_bytes*/) { + size_t total_read = 0; int status = 0; int loc = 0; int16_t fd = select_descriptor(1000); if (fd < 0) { close(); - return -1; + return static_cast<size_t>(-1); } auto fd_ssl = get_ssl(fd); if (IsNullOrEmpty(fd_ssl)) { - return -1; + return static_cast<size_t>(-1); } if (!SSL_pending(fd_ssl)) { return 0; } while (buflen) { if (fd <= 0) { - return -1; + return static_cast<size_t>(-1); } int sslStatus; do { - status = SSL_read(fd_ssl, buf + loc, buflen); + status = SSL_read(fd_ssl, buf + loc, gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max())))); sslStatus = SSL_get_error(fd_ssl, status); } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ && SSL_pending(fd_ssl)); - buflen -= status; + if (status < 0) break; + + buflen -= gsl::narrow<size_t>(status); loc += status; - total_read += status; + total_read += gsl::narrow<size_t>(status); } return total_read; @@ -418,33 +419,32 @@ int TLSSocket::write(const uint8_t *value, int size) { return writeData(value, size, fd); } -int TLSSocket::read(uint8_t *buf, int buflen) { - gsl_Expects(buflen >= 0); - int total_read = 0; +size_t TLSSocket::read(uint8_t *buf, size_t buflen) { + size_t total_read = 0; int status = 0; while (buflen) { - int16_t fd = select_descriptor(1000); + const int16_t fd = select_descriptor(1000); if (fd < 0) { close(); - return -1; + return static_cast<size_t>(-1); } int sslStatus; do { auto fd_ssl = get_ssl(fd); if (IsNullOrEmpty(fd_ssl)) { - return -1; + return static_cast<size_t>(-1); } - status = SSL_read(fd_ssl, buf, buflen); + status = SSL_read(fd_ssl, buf, gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max())))); sslStatus = SSL_get_error(fd_ssl, status); } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ); if (status < 0) break; - buflen -= status; + buflen -= gsl::narrow<size_t>(status); buf += status; - total_read += status; + total_read += gsl::narrow<size_t>(status); } return total_read; diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index f9af7f9..2fac08b 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -235,12 +235,10 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableCo } bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t bufferSize) { - int ret; - org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<unsigned int>(bufferSize)); - ret = outStream.read(uuid_); - if (ret <= 0) { + auto ret = outStream.read(uuid_); + if (ret == static_cast<size_t>(-1)) { return false; } @@ -272,22 +270,22 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff } ret = outStream.read(this->_componentId); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } ret = outStream.read(this->_componentType); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } ret = outStream.read(this->flow_uuid_); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } ret = outStream.read(this->_details); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } @@ -301,19 +299,19 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff for (uint32_t i = 0; i < numAttributes; i++) { std::string key; ret = outStream.read(key); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } std::string value; ret = outStream.read(value); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } this->_attributes[key] = value; } ret = outStream.read(this->_contentFullPath); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } @@ -328,7 +326,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff } ret = outStream.read(this->_sourceQueueIdentifier); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } @@ -343,7 +341,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff for (uint32_t i = 0; i < number; i++) { utils::Identifier parentUUID; ret = outStream.read(parentUUID); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } this->addParentUuid(parentUUID); @@ -356,23 +354,23 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff for (uint32_t i = 0; i < number; i++) { utils::Identifier childUUID; ret = outStream.read(childUUID); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } this->addChildUuid(childUUID); } } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) { ret = outStream.read(this->_transitUri); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { ret = outStream.read(this->_transitUri); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } ret = outStream.read(this->_sourceSystemFlowFileIdentifier); - if (ret <= 0) { + if (ret == static_cast<size_t>(-1)) { return false; } } diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp index 7ea213f..0eb1f7e 100644 --- a/libminifi/src/sitetosite/RawSocketProtocol.cpp +++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp @@ -342,40 +342,40 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) { } uint32_t number; - status = peer_->read(number); + auto ret = peer_->read(number); - if (status <= 0) { + if (ret == static_cast<size_t>(-1)) { tearDown(); return false; } for (uint32_t i = 0; i < number; i++) { std::string host; - status = peer_->read(host); - if (status <= 0) { + ret = peer_->read(host); + if (ret == static_cast<size_t>(-1)) { tearDown(); return false; } uint32_t port; - status = peer_->read(port); - if (status <= 0) { + ret = peer_->read(port); + if (ret == static_cast<size_t>(-1)) { tearDown(); return false; } uint8_t secure; - status = peer_->read(secure); - if (status <= 0) { + ret = peer_->read(secure); + if (ret == static_cast<size_t>(-1)) { tearDown(); return false; } uint32_t count; - status = peer_->read(count); - if (status <= 0) { + ret = peer_->read(count); + if (ret == static_cast<size_t>(-1)) { tearDown(); return false; } - PeerStatus status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true); - peers.push_back(std::move(status)); + PeerStatus peer_status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true); + peers.push_back(std::move(peer_status)); logging::LOG_TRACE(logger_) << "Site2Site Peer host " << host << " port " << port << " Secure " << std::to_string(secure); } @@ -397,15 +397,14 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) { int RawSiteToSiteClient::readRequestType(RequestType &type) { std::string requestTypeStr; - int ret = peer_->read(requestTypeStr); - - if (ret <= 0) - return ret; + const auto ret = peer_->read(requestTypeStr); + if (ret == static_cast<size_t>(-1)) + return gsl::narrow_cast<int>(ret); for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) { if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) { type = (RequestType) i; - return ret; + return gsl::narrow_cast<int>(ret); } } @@ -417,7 +416,7 @@ int RawSiteToSiteClient::readRespond(const std::shared_ptr<Transaction> &transac } int RawSiteToSiteClient::writeRespond(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) { - return writeResponse(transaction, code, message); + return writeResponse(transaction, code, std::move(message)); } bool RawSiteToSiteClient::negotiateCodec() { diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp index 1833144..fb24f73 100644 --- a/libminifi/src/sitetosite/SiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp @@ -32,7 +32,7 @@ namespace sitetosite { int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode &code, std::string &message) { uint8_t firstByte; - int ret = peer_->read(firstByte); + auto ret = peer_->read(firstByte); if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1) return -1; @@ -48,27 +48,25 @@ int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transac ret = peer_->read(thirdByte); - if (ret <= 0) - return ret; + if (ret != static_cast<size_t>(-1)) + return gsl::narrow_cast<int>(ret); code = (RespondCode) thirdByte; RespondCodeContext *resCode = this->getRespondCodeContext(code); - - if (resCode == NULL) { - // Not a valid respond code + if (!resCode) { return -1; } if (resCode->hasDescription) { ret = peer_->read(message); - if (ret <= 0) + if (ret != static_cast<size_t>(-1)) return -1; } return gsl::narrow<int>(3 + message.size()); } void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID) { - std::shared_ptr<Transaction> transaction = NULL; + std::shared_ptr<Transaction> transaction; auto it = this->known_transactions_.find(transactionID); @@ -85,7 +83,7 @@ void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID) int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode code, std::string message) { RespondCodeContext *resCode = this->getRespondCodeContext(code); - if (resCode == NULL) { + if (!resCode) { // Not a valid respond code return -1; } @@ -205,7 +203,7 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessCont bool SiteToSiteClient::confirm(const utils::Identifier& transactionID) { int ret; - std::shared_ptr<Transaction> transaction = NULL; + std::shared_ptr<Transaction> transaction; if (peer_state_ != READY) { bootstrap(); @@ -529,8 +527,7 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke } bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacket *packet, bool &eof) { - int ret; - std::shared_ptr<Transaction> transaction = NULL; + std::shared_ptr<Transaction> transaction; if (peer_state_ != READY) { bootstrap(); @@ -568,9 +565,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke RespondCode code; std::string message; - ret = readResponse(transaction, code, message); - - if (ret <= 0) { + if (readResponse(transaction, code, message) <= 0) { return false; } if (code == CONTINUE_TRANSACTION) { @@ -595,8 +590,8 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke // start to read the packet uint32_t numAttributes; - ret = transaction->getStream().read(numAttributes); - if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) { + auto ret = transaction->getStream().read(numAttributes); + if (ret != static_cast<size_t>(-1) || numAttributes > MAX_NUM_ATTRIBUTES) { return false; } @@ -606,11 +601,11 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke std::string key; std::string value; ret = transaction->getStream().read(key, true); - if (ret <= 0) { + if (ret != static_cast<size_t>(-1)) { return false; } ret = transaction->getStream().read(value, true); - if (ret <= 0) { + if (ret != static_cast<size_t>(-1)) { return false; } packet->_attributes[key] = value; @@ -619,7 +614,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke uint64_t len; ret = transaction->getStream().read(len); - if (ret <= 0) { + if (ret != static_cast<size_t>(-1)) { return false; } diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp index 33c9634..a64c3ac 100644 --- a/libminifi/src/utils/ByteArrayCallback.cpp +++ b/libminifi/src/utils/ByteArrayCallback.cpp @@ -35,10 +35,10 @@ int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea if (stream->size() > 0) { std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[stream->size()]); readFully(buffer.get(), stream->size()); - stream->read(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(stream->size())); - return stream->size(); + stream->read(reinterpret_cast<uint8_t*>(buffer.get()), stream->size()); + return gsl::narrow<int64_t>(stream->size()); } - return size_.load(); + return gsl::narrow<int64_t>(size_.load()); } int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) { @@ -46,7 +46,7 @@ int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& str std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]); auto written = readFully(buffer.get(), size_); stream->write(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(written)); - return stream->size(); + return gsl::narrow<int64_t>(stream->size()); } void StreamOutputCallback::write(char *data, size_t size) { @@ -55,7 +55,7 @@ void StreamOutputCallback::write(char *data, size_t size) { write_and_notify(data, size); } -const std::vector<char> ByteOutputCallback::to_string() { +std::vector<char> ByteOutputCallback::to_string() { std::vector<char> buffer; buffer.resize(size_.load()); readFully(buffer.data(), size_.load()); diff --git a/libminifi/src/utils/FileOutputCallback.cpp b/libminifi/src/utils/FileOutputCallback.cpp index 6dfa565..bbd1265 100644 --- a/libminifi/src/utils/FileOutputCallback.cpp +++ b/libminifi/src/utils/FileOutputCallback.cpp @@ -34,7 +34,7 @@ int64_t FileOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea return size_.load(); } -const std::vector<char> FileOutputCallback::to_string() { +std::vector<char> FileOutputCallback::to_string() { std::vector<char> buffer; buffer.insert(std::end(buffer), std::begin(file_), std::end(file_)); return buffer; diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h index 2b4bf73..312ca76 100644 --- a/libminifi/test/BufferReader.h +++ b/libminifi/test/BufferReader.h @@ -26,14 +26,14 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback { public: explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){} - int write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) { + size_t write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) { uint8_t tmpBuffer[4096]{}; std::size_t remaining_len = len; - int total_read = 0; + size_t total_read = 0; while (remaining_len > 0) { - auto ret = input.read(tmpBuffer, gsl::narrow<int>(std::min(remaining_len, sizeof(tmpBuffer)))); + const auto ret = input.read(tmpBuffer, std::min(remaining_len, sizeof(tmpBuffer))); if (ret == 0) break; - if (ret < 0) return ret; + if (ret == static_cast<size_t>(-1)) return ret; remaining_len -= ret; total_read += ret; auto prevSize = buffer_.size(); @@ -44,7 +44,7 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback { } int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) { - return write(*stream.get(), stream->size()); + return static_cast<int64_t>(write(*stream.get(), stream->size())); } private: diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp index 76507df..3d0da83 100644 --- a/libminifi/test/archive-tests/CompressContentTests.cpp +++ b/libminifi/test/archive-tests/CompressContentTests.cpp @@ -41,29 +41,23 @@ #include "processors/PutFile.h" #include "utils/file/FileUtils.h" #include "../Utils.h" +#include "utils/gsl.h" class ReadCallback: public minifi::InputStreamCallback { public: - explicit ReadCallback(size_t size) : - read_size_(0) { - buffer_size_ = size; - buffer_ = new uint8_t[buffer_size_]; - archive_buffer_ = nullptr; - archive_buffer_size_ = 0; - } - ~ReadCallback() { - if (buffer_) - delete[] buffer_; - if (archive_buffer_) - delete[] archive_buffer_; - } - int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) { + explicit ReadCallback(size_t size) + :buffer_size_{size} + {} + ~ReadCallback() override { + delete[] buffer_; + delete[] archive_buffer_; + } + int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override { int64_t total_read = 0; - int64_t ret = 0; do { - ret = stream->read(buffer_ + read_size_, gsl::narrow<int>(buffer_size_ - read_size_)); + const auto ret = stream->read(buffer_ + read_size_, buffer_size_ - read_size_); if (ret == 0) break; - if (ret < 0) return ret; + if (ret == static_cast<size_t>(-1)) return -1; read_size_ += gsl::narrow<size_t>(ret); total_read += ret; } while (buffer_size_ != read_size_); @@ -78,18 +72,18 @@ class ReadCallback: public minifi::InputStreamCallback { struct archive_entry *ae; REQUIRE(archive_read_next_header(a, &ae) == ARCHIVE_OK); - int size = gsl::narrow<int>(archive_entry_size(ae)); + const auto size = archive_entry_size(ae); archive_buffer_ = new char[size]; archive_buffer_size_ = size; - archive_read_data(a, archive_buffer_, size); + archive_read_data(a, archive_buffer_, gsl::narrow<size_t>(size)); archive_read_free(a); } - uint8_t *buffer_; size_t buffer_size_; - size_t read_size_; - char *archive_buffer_; - int archive_buffer_size_; + uint8_t *buffer_ = new uint8_t[buffer_size_]; + size_t read_size_ = 0; + char *archive_buffer_ = nullptr; + int64_t archive_buffer_size_ = 0; }; /** diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp index 60e233f..074b153 100644 --- a/libminifi/test/archive-tests/MergeFileTests.cpp +++ b/libminifi/test/archive-tests/MergeFileTests.cpp @@ -88,9 +88,9 @@ class FixedBuffer : public minifi::InputStreamCallback { REQUIRE(size_ + len <= capacity_); int total_read = 0; do { - auto ret = input.read(end(), gsl::narrow<int>(len)); + const auto ret = input.read(end(), len); if (ret == 0) break; - if (ret < 0) return ret; + if (ret == static_cast<size_t>(-1)) return -1; size_ += ret; len -= ret; total_read += ret; @@ -98,7 +98,7 @@ class FixedBuffer : public minifi::InputStreamCallback { return total_read; } int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) { - return write(*stream.get(), capacity_); + return write(*stream, capacity_); } private: @@ -111,8 +111,8 @@ std::vector<FixedBuffer> read_archives(const FixedBuffer& input) { class ArchiveEntryReader { public: explicit ArchiveEntryReader(archive* arch) : arch(arch) {} - int read(uint8_t* out, std::size_t len) { - return gsl::narrow<int>(archive_read_data(arch, out, len)); + size_t read(uint8_t* out, std::size_t len) { + return gsl::narrow_cast<size_t>(archive_read_data(arch, out, len)); } private: archive* arch; diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp index 059dfa9..d2e6d51 100644 --- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp +++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp @@ -56,12 +56,11 @@ const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr< str = ""; uint8_t buffer[4096]{}; while (true) { - auto ret = stream->read(buffer, sizeof(buffer)); + const auto ret = stream->read(buffer, sizeof(buffer)); REQUIRE(ret >= 0); - if (ret == 0) { - break; - } - str += std::string{reinterpret_cast<char*>(buffer), static_cast<std::size_t>(ret)}; + REQUIRE(ret != static_cast<size_t>(-1)); + if (ret == 0) { break; } + str += std::string{reinterpret_cast<char*>(buffer), ret}; } return stream; } diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp index 6409f3e..7b5c75c 100644 --- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp +++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp @@ -104,7 +104,7 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") { std::string readstr; // -1 tell us we have an invalid stream - REQUIRE(read_stream->read(readstr) == -1); + REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1)); } TEST_CASE("Test Empty Claim", "[TestDBCR3]") { @@ -140,7 +140,7 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") { std::string readstr; // -1 tell us we have an invalid stream - REQUIRE(read_stream->read(readstr) == -1); + REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1)); } TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") { diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp index 88b3929..5e7c3b6 100644 --- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp +++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp @@ -72,5 +72,5 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") { minifi::io::RocksDbStream nonExistingStream("two", gsl::make_not_null(db.get())); - REQUIRE(nonExistingStream.read(nullptr, 0) == -1); + REQUIRE(nonExistingStream.read(nullptr, 0) == static_cast<size_t>(-1)); } diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp index b13dcfa..1ff4ab9 100644 --- a/libminifi/test/unit/FileStreamTests.cpp +++ b/libminifi/test/unit/FileStreamTests.cpp @@ -45,7 +45,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") { minifi::io::FileStream stream(path, 0, true); std::vector<uint8_t> readBuffer; - REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size()); + REQUIRE(stream.read(readBuffer, stream.size()) == stream.size()); uint8_t* data = readBuffer.data(); @@ -59,7 +59,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") { std::vector<uint8_t> verifybuffer; - REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size()); + REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size()); data = verifybuffer.data(); @@ -83,7 +83,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") { minifi::io::FileStream stream(path, 0, true); std::vector<uint8_t> readBuffer; - REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size()); + REQUIRE(stream.read(readBuffer, stream.size()) == stream.size()); uint8_t* data = readBuffer.data(); @@ -97,7 +97,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") { std::vector<uint8_t> verifybuffer; - REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size()); + REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size()); data = verifybuffer.data(); @@ -121,7 +121,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") { minifi::io::FileStream stream(path, 0, true); std::vector<uint8_t> readBuffer; - REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size()); + REQUIRE(stream.read(readBuffer, stream.size()) == stream.size()); uint8_t* data = readBuffer.data(); @@ -135,7 +135,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") { std::vector<uint8_t> verifybuffer; - REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size()); + REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size()); data = verifybuffer.data(); @@ -159,7 +159,7 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") { minifi::io::FileStream stream(path, 0, true); std::vector<uint8_t> readBuffer; - REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size()); + REQUIRE(stream.read(readBuffer, stream.size()) == stream.size()); uint8_t* data = readBuffer.data(); @@ -173,11 +173,11 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") { std::vector<uint8_t> verifybuffer; - REQUIRE(stream.read(nullptr, gsl::narrow<int>(stream.size())) == -1); + REQUIRE(stream.read(nullptr, stream.size()) == static_cast<size_t>(-1)); data = verifybuffer.data(); - REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == ""); + REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()).empty()); std::remove(ss.str().c_str()); } @@ -197,7 +197,7 @@ TEST_CASE("TestFileBeyondEnd3", "[TestLoader]") { minifi::io::FileStream stream(path, 0, true); std::vector<uint8_t> readBuffer; - REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size()); + REQUIRE(stream.read(readBuffer, stream.size()) == stream.size()); uint8_t* data = readBuffer.data(); @@ -232,7 +232,7 @@ TEST_CASE("TestFileExceedSize", "[TestLoader]") { minifi::io::FileStream stream(path, 0, true); std::vector<uint8_t> readBuffer; - REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size()); + REQUIRE(stream.read(readBuffer, stream.size()) == stream.size()); uint8_t* data = readBuffer.data(); @@ -280,7 +280,7 @@ TEST_CASE("Non-existing file read/write test") { REQUIRE(test_controller.getLog().getInstance().contains("Error writing to file: invalid file stream", std::chrono::seconds(0))); std::vector<uint8_t> readBuffer; stream.seek(0); - REQUIRE(stream.read(readBuffer, 1) == -1); + REQUIRE(stream.read(readBuffer, 1) == static_cast<size_t>(-1)); REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid file stream", std::chrono::seconds(0))); } @@ -300,10 +300,10 @@ TEST_CASE("Existing file read/write test") { REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error writing to file", std::chrono::seconds(0))); std::vector<uint8_t> readBuffer; stream.seek(0); - REQUIRE_FALSE(stream.read(readBuffer, 11) == -1); + REQUIRE_FALSE(stream.read(readBuffer, 11) == static_cast<size_t>(-1)); REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error reading from file", std::chrono::seconds(0))); stream.seek(0); - REQUIRE(stream.read(nullptr, 11) == -1); + REQUIRE(stream.read(nullptr, 11) == static_cast<size_t>(-1)); REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid buffer", std::chrono::seconds(0))); } diff --git a/libminifi/test/unit/SiteToSiteHelper.h b/libminifi/test/unit/SiteToSiteHelper.h index b4d5e8f..61257e2 100644 --- a/libminifi/test/unit/SiteToSiteHelper.h +++ b/libminifi/test/unit/SiteToSiteHelper.h @@ -59,7 +59,7 @@ class SiteToSiteResponder : public minifi::io::BaseStream { * @param len length to read * @return resulting read size **/ - int read(uint8_t *value, int len) override { + size_t read(uint8_t *value, size_t len) override { return server_responses_.read(value, len); } diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp index b339a28..ca60e7f 100644 --- a/nanofi/src/api/nanofi.cpp +++ b/nanofi/src/api/nanofi.cpp @@ -459,7 +459,7 @@ int get_content(const flow_file_record* ff, uint8_t* target, int size) { std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo); auto stream = (*content_repo)->read(*claim); - return stream->read(target, size); + return static_cast<int>(stream->read(target, static_cast<size_t>(size))); } else { file_buffer fb = file_to_buffer(ff->contentLocation); if (size < 0) { diff --git a/nanofi/tests/CSite2SiteTests.cpp b/nanofi/tests/CSite2SiteTests.cpp index a1f8a08..ad3c086 100644 --- a/nanofi/tests/CSite2SiteTests.cpp +++ b/nanofi/tests/CSite2SiteTests.cpp @@ -39,6 +39,7 @@ #include "core/cstructs.h" #include "RandomServerSocket.h" #include "core/log.h" +#include "utils/gsl.h" #define FMT_DEFAULT fmt_lower @@ -145,15 +146,15 @@ void sunny_path_bootstrap(minifi::io::BaseStream* stream, TransferState& transfe size_t read_len = 0; while(!found_codec) { uint8_t handshake_data[1000]; - int actual_len = stream->read(handshake_data+read_len, 1000-read_len); - if(actual_len <= 0) { + const auto actual_len = stream->read(handshake_data+read_len, 1000-read_len); + if(actual_len == 0 || actual_len == static_cast<size_t>(-1)) { continue; } read_len += actual_len; - std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len); - auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end()); + const std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len); + const auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end()); if(it != incoming_data.end()){ - size_t idx = std::distance(incoming_data.begin(), it); + const auto idx = gsl::narrow<size_t>(std::distance(incoming_data.begin(), it)); // Actual version follows the string as an uint32_t // that should be the end of the buffer found_codec = idx + CODEC_NAME.length() + sizeof(uint32_t) == read_len; }
