This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 5a3f5464cb9c6c95be79c270d3a793984621d190 Author: Marton Szasz <[email protected]> AuthorDate: Wed May 26 15:49:50 2021 +0200 MINIFICPP-1507 convert OutputStream::write to size_t Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1083 --- controller/Controller.h | 14 +- extensions/aws/s3/S3Wrapper.cpp | 20 +- extensions/bustache/ApplyTemplate.cpp | 3 +- extensions/civetweb/processors/ListenHTTP.cpp | 23 ++- extensions/http-curl/client/HTTPStream.cpp | 33 ++-- extensions/http-curl/client/HTTPStream.h | 13 +- extensions/http-curl/tests/HTTPHandlers.h | 2 +- .../http-curl/tests/unit/InvokeHTTPTests.cpp | 3 +- extensions/jni/jvm/JniReferenceObjects.h | 4 +- extensions/libarchive/CompressContent.h | 19 +- extensions/libarchive/MergeContent.h | 67 +++---- extensions/libarchive/UnfocusArchiveEntry.cpp | 6 +- extensions/librdkafka/ConsumeKafka.cpp | 9 +- extensions/mqtt/processors/ConsumeMQTT.h | 24 ++- extensions/opc/include/fetchopc.h | 4 +- extensions/opencv/CaptureRTSPFrame.h | 15 +- extensions/opencv/FrameIO.h | 7 +- .../SourceInitiatedSubscriptionListener.cpp | 3 +- .../rocksdb-repos/DatabaseContentRepository.cpp | 8 +- extensions/rocksdb-repos/RocksDbStream.cpp | 46 ++--- extensions/rocksdb-repos/RocksDbStream.h | 2 +- extensions/script/lua/LuaBaseStream.cpp | 2 +- extensions/script/python/PyBaseStream.cpp | 5 +- extensions/sensors/SensorBase.h | 23 +-- extensions/sftp/client/SFTPClient.cpp | 10 +- .../processors/ExecuteProcess.h | 10 +- .../processors/GenerateFlowFile.cpp | 2 +- .../processors/GenerateFlowFile.h | 22 +-- extensions/standard-processors/processors/GetTCP.h | 3 +- .../standard-processors/processors/ListenSyslog.h | 15 +- .../standard-processors/processors/TailFile.cpp | 15 +- .../tests/integration/SecureSocketGetTCPTest.cpp | 20 +- .../TLSServerSocketSupportedProtocolsTest.cpp | 16 +- .../standard-processors/tests/unit/GetTCPTests.cpp | 6 +- extensions/usb-camera/CMakeLists.txt | 2 +- extensions/usb-camera/GetUSBCamera.cpp | 10 +- .../CollectorInitiatedSubscription.cpp | 9 +- .../windows-event-log/ConsumeWindowsEventLog.cpp | 3 +- libminifi/include/io/AtomicEntryStream.h | 26 ++- libminifi/include/io/BufferStream.h | 4 +- libminifi/include/io/CRCStream.h | 6 +- libminifi/include/io/ClientSocket.h | 2 +- libminifi/include/io/DescriptorStream.h | 2 +- libminifi/include/io/FileStream.h | 2 +- libminifi/include/io/OutputStream.h | 30 ++- libminifi/include/io/Stream.h | 10 +- libminifi/include/io/StreamPipe.h | 6 +- libminifi/include/io/ZlibStream.h | 4 +- libminifi/include/io/tls/SecureDescriptorStream.h | 2 +- libminifi/include/io/tls/TLSServerSocket.h | 8 +- libminifi/include/io/tls/TLSSocket.h | 4 +- .../include/serialization/FlowFileV3Serializer.h | 4 +- libminifi/include/sitetosite/Peer.h | 2 +- libminifi/include/sitetosite/SiteToSiteClient.h | 2 +- libminifi/include/utils/GeneralUtils.h | 2 + libminifi/src/FlowFileRecord.cpp | 99 +++++----- libminifi/src/c2/ControllerSocketProtocol.cpp | 10 +- libminifi/src/core/ContentSession.cpp | 8 +- libminifi/src/core/ProcessSession.cpp | 6 +- libminifi/src/io/BufferStream.cpp | 3 +- libminifi/src/io/ClientSocket.cpp | 19 +- libminifi/src/io/DescriptorStream.cpp | 22 +-- libminifi/src/io/FileStream.cpp | 49 +++-- libminifi/src/io/OutputStream.cpp | 24 +-- libminifi/src/io/ZlibStream.cpp | 26 ++- libminifi/src/io/tls/SecureDescriptorStream.cpp | 34 ++-- libminifi/src/io/tls/TLSServerSocket.cpp | 79 ++++---- libminifi/src/io/tls/TLSSocket.cpp | 22 +-- libminifi/src/provenance/Provenance.cpp | 206 ++++++++++++--------- .../src/serialization/FlowFileV3Serializer.cpp | 103 +++++------ libminifi/src/sitetosite/Peer.cpp | 7 +- libminifi/src/sitetosite/RawSocketProtocol.cpp | 21 ++- libminifi/src/sitetosite/SiteToSiteClient.cpp | 88 +++++---- libminifi/src/utils/ByteArrayCallback.cpp | 2 +- libminifi/test/archive-tests/MergeFileTests.cpp | 4 +- .../test/persistence-tests/PersistenceTests.cpp | 2 +- .../test/rocksdb-tests/ContentSessionTests.cpp | 5 +- .../rocksdb-tests/DBContentRepositoryTests.cpp | 2 +- .../test/rocksdb-tests/RocksDBStreamTests.cpp | 10 +- libminifi/test/sql-tests/SQLTestPlan.h | 2 +- libminifi/test/unit/CRCTests.cpp | 14 +- libminifi/test/unit/FileStreamTests.cpp | 10 +- libminifi/test/unit/FlowFileSerializationTests.cpp | 4 +- libminifi/test/unit/SiteToSiteHelper.h | 4 +- libminifi/test/unit/SocketTests.cpp | 59 +++--- libminifi/test/unit/ZlibStreamTests.cpp | 31 ++-- nanofi/src/api/nanofi.cpp | 6 +- 87 files changed, 793 insertions(+), 802 deletions(-) diff --git a/controller/Controller.h b/controller/Controller.h index 40fb046..99c475f 100644 --- a/controller/Controller.h +++ b/controller/Controller.h @@ -36,7 +36,7 @@ bool sendSingleCommand(std::unique_ptr<minifi::io::Socket> socket, uint8_t op, c minifi::io::BufferStream stream; stream.write(&op, 1); stream.write(value); - return static_cast<size_t>(socket->write(const_cast<uint8_t*>(stream.getBuffer()), gsl::narrow<int>(stream.size()))) == stream.size(); + return socket->write(stream.getBuffer(), stream.size()) == stream.size(); } /** @@ -77,7 +77,7 @@ int updateFlow(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out, st stream.write(&op, 1); stream.write("flow"); stream.write(file); - if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), gsl::narrow<int>(stream.size())) < 0) { + if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) { return -1; } // read the response @@ -107,7 +107,7 @@ int getFullConnections(std::unique_ptr<minifi::io::Socket> socket, std::ostream minifi::io::BufferStream stream; stream.write(&op, 1); stream.write("getfull"); - if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), gsl::narrow<int>(stream.size())) < 0) { + if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) { return -1; } // read the response @@ -133,7 +133,7 @@ int getJstacks(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out) { minifi::io::BufferStream stream; stream.write(&op, 1); stream.write("jstack"); - if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), gsl::narrow<int>(stream.size())) < 0) { + if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) { return -1; } // read the response @@ -171,7 +171,7 @@ int getConnectionSize(std::unique_ptr<minifi::io::Socket> socket, std::ostream & stream.write(&op, 1); stream.write("queue"); stream.write(connection); - if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), gsl::narrow<int>(stream.size())) < 0) { + if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) { return -1; } // read the response @@ -191,7 +191,7 @@ int listComponents(std::unique_ptr<minifi::io::Socket> socket, std::ostream &out uint8_t op = minifi::c2::Operation::DESCRIBE; stream.write(&op, 1); stream.write("components"); - if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), gsl::narrow<int>(stream.size())) < 0) { + if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) { return -1; } uint16_t responses = 0; @@ -215,7 +215,7 @@ int listConnections(std::unique_ptr<minifi::io::Socket> socket, std::ostream &ou uint8_t op = minifi::c2::Operation::DESCRIBE; stream.write(&op, 1); stream.write("connections"); - if (socket->write(const_cast<uint8_t*>(stream.getBuffer()), gsl::narrow<int>(stream.size())) < 0) { + if (minifi::io::isError(socket->write(stream.getBuffer(), stream.size()))) { return -1; } uint16_t responses = 0; diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp index a7c375a..d2d55f1 100644 --- a/extensions/aws/s3/S3Wrapper.cpp +++ b/extensions/aws/s3/S3Wrapper.cpp @@ -24,11 +24,12 @@ #include <utility> #include <vector> +#include "S3ClientRequestSender.h" #include "utils/GeneralUtils.h" #include "utils/StringUtils.h" #include "utils/file/FileUtils.h" #include "utils/RegexUtils.h" -#include "S3ClientRequestSender.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -124,19 +125,20 @@ bool S3Wrapper::deleteObject(const DeleteObjectRequestParameters& params) { int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_size, io::BaseStream& output) { std::vector<uint8_t> buffer(4096); - int64_t write_size = 0; - while (write_size < data_size) { - auto next_write_size = (std::min)(data_size - write_size, static_cast<int64_t>(4096)); - if (!source.read(reinterpret_cast<char*>(buffer.data()), next_write_size)) { + size_t write_size = 0; + if (data_size < 0) return 0; + while (write_size < gsl::narrow<uint64_t>(data_size)) { + const auto next_write_size = (std::min)(gsl::narrow<size_t>(data_size) - write_size, size_t{4096}); + if (!source.read(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_write_size))) { return -1; } - auto ret = output.write(buffer.data(), next_write_size); - if (ret < 0) { - return ret; + const auto ret = output.write(buffer.data(), next_write_size); + if (io::isError(ret)) { + return -1; } write_size += next_write_size; } - return write_size; + return gsl::narrow<int64_t>(write_size); } minifi::utils::optional<GetObjectResult> S3Wrapper::getObject(const GetObjectRequestParameters& get_object_params, io::BaseStream& out_body) { diff --git a/extensions/bustache/ApplyTemplate.cpp b/extensions/bustache/ApplyTemplate.cpp index d4cdde5..84b0338 100644 --- a/extensions/bustache/ApplyTemplate.cpp +++ b/extensions/bustache/ApplyTemplate.cpp @@ -84,8 +84,7 @@ int64_t ApplyTemplate::WriteCallback::process(const std::shared_ptr<io::BaseStre // TODO(calebj) write ostream reciever for format() to prevent excessive copying std::string ostring = to_string(format(data)); - stream->write(reinterpret_cast<uint8_t *>(const_cast<char *>(ostring.c_str())), - ostring.length()); + stream->write(reinterpret_cast<const uint8_t *>(ostring.c_str()), ostring.length()); return ostring.length(); } diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp index e544e82..09c52e8 100644 --- a/extensions/civetweb/processors/ListenHTTP.cpp +++ b/extensions/civetweb/processors/ListenHTTP.cpp @@ -476,29 +476,27 @@ void ListenHTTP::Handler::writeBody(mg_connection *conn, const mg_request_info * std::unique_ptr<io::BufferStream> ListenHTTP::Handler::createContentBuffer(struct mg_connection *conn, const struct mg_request_info *req_info) { auto content_buffer = utils::make_unique<io::BufferStream>(); - int64_t rlen; - int64_t nlen = 0; + size_t nlen = 0; int64_t tlen = req_info->content_length; uint8_t buf[16384]; // if we have no content length we should call mg_read until // there is no data left from the stream to be HTTP/1.1 compliant - while (tlen == -1 || nlen < tlen) { - rlen = tlen == -1 ? sizeof(buf) : tlen - nlen; - - if (rlen > (int64_t) sizeof(buf)) { - rlen = (int64_t) sizeof(buf); + while (tlen == -1 || (tlen > 0 && nlen < gsl::narrow<size_t>(tlen))) { + auto rlen = tlen == -1 ? sizeof(buf) : gsl::narrow<size_t>(tlen) - nlen; + if (rlen > sizeof(buf)) { + rlen = sizeof(buf); } // Read a buffer of data from client - rlen = mg_read(conn, &buf[0], (size_t) rlen); - - if (rlen <= 0) { + const auto mg_read_return = mg_read(conn, &buf[0], rlen); + if (mg_read_return <= 0) { break; } + rlen = gsl::narrow<size_t>(mg_read_return); // Transfer buffer data to the output stream - content_buffer->write(&buf[0], gsl::narrow<int>(rlen)); + content_buffer->write(&buf[0], rlen); nlen += rlen; } @@ -511,7 +509,8 @@ ListenHTTP::WriteCallback::WriteCallback(std::unique_ptr<io::BufferStream> reque } int64_t ListenHTTP::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) { - return stream->write(const_cast<uint8_t*>(request_content_->getBuffer()), gsl::narrow<int>(request_content_->size())); + const auto write_ret = stream->write(request_content_->getBuffer(), request_content_->size()); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } bool ListenHTTP::isSecure() const { diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp index 9366f91..639e875 100644 --- a/extensions/http-curl/client/HTTPStream.cpp +++ b/extensions/http-curl/client/HTTPStream.cpp @@ -19,6 +19,7 @@ #include "HTTPStream.h" #include <fstream> +#include <utility> #include <memory> #include "HTTPCallback.h" @@ -32,7 +33,7 @@ namespace minifi { namespace io { HttpStream::HttpStream(std::shared_ptr<utils::HTTPClient> client) - : http_client_(client), + : http_client_(std::move(client)), written(0), // given the nature of the stream we don't want to slow libCURL, we will produce // a warning instead allowing us to adjust it server side or through the local configuration. @@ -54,27 +55,23 @@ void HttpStream::seek(size_t /*offset*/) { // data stream overrides -int HttpStream::write(const uint8_t *value, int size) { - gsl_Expects(size >= 0); - if (size == 0) { - return 0; +size_t HttpStream::write(const uint8_t *value, size_t size) { + if (size == 0) return 0; + if (IsNullOrEmpty(value)) { + return STREAM_ERROR; } - if (!IsNullOrEmpty(value)) { + if (!started_) { + std::lock_guard<std::mutex> lock(mutex_); if (!started_) { - std::lock_guard<std::mutex> lock(mutex_); - if (!started_) { - callback_.ptr = &http_callback_; - callback_.pos = 0; - http_client_->setUploadCallback(&callback_); - http_client_future_ = std::async(std::launch::async, submit_client, http_client_); - started_ = true; - } + callback_.ptr = &http_callback_; + callback_.pos = 0; + http_client_->setUploadCallback(&callback_); + http_client_future_ = std::async(std::launch::async, submit_client, http_client_); + started_ = true; } - http_callback_.process(value, size); - return size; - } else { - return -1; } + http_callback_.process(value, size); + return size; } size_t HttpStream::read(uint8_t *buf, size_t buflen) { diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h index b23d515..d6ce39b 100644 --- a/extensions/http-curl/client/HTTPStream.h +++ b/extensions/http-curl/client/HTTPStream.h @@ -97,7 +97,7 @@ class HttpStream : public io::BaseStream { * @param value value to write * @param size size of value */ - int write(const uint8_t *value, int size) override; + size_t write(const uint8_t *value, size_t size) override; static bool submit_client(std::shared_ptr<utils::HTTPClient> client) { if (client == nullptr) @@ -132,17 +132,6 @@ class HttpStream : public io::BaseStream { } protected: - /** - * Populates the vector using the provided type name. - * @param buf output buffer - * @param t incoming object - * @returns number of bytes read. - */ - template<typename T> - int readBuffer(std::vector<uint8_t>&, const T&); - - void reset(); - std::vector<uint8_t> array; std::shared_ptr<utils::HTTPClient> http_client_; diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h index 890acb7..e797b37 100644 --- a/extensions/http-curl/tests/HTTPHandlers.h +++ b/extensions/http-curl/tests/HTTPHandlers.h @@ -303,7 +303,7 @@ class FlowFileResponder : public ServerAwareHandler { } uint64_t length = flow->data.size(); stream.write(length); - stream.write(flow->data.data(), gsl::narrow<int>(length)); + stream.write(flow->data.data(), gsl::narrow<size_t>(length)); } } else { mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: " diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp index 9c03e64..a77bf00 100644 --- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp +++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp @@ -152,7 +152,8 @@ class CallBack : public minifi::OutputStreamCallback { virtual int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) { // leaving the typo for posterity sake std::string st = "we're gnna write some test stuff"; - return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), gsl::narrow<int>(st.length())); + const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(st.c_str()), st.length()); + return minifi::io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } }; diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h index 5f994bb..b7ef344 100644 --- a/extensions/jni/jvm/JniReferenceObjects.h +++ b/extensions/jni/jvm/JniReferenceObjects.h @@ -29,6 +29,7 @@ #include "core/Processor.h" #include "core/ProcessSession.h" #include "core/WeakReference.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -101,7 +102,8 @@ class JniByteOutStream : public minifi::OutputStreamCallback { virtual ~JniByteOutStream() = default; virtual int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) { - return stream->write((uint8_t*) bytes_, length_); + const auto write_ret = stream->write((uint8_t*)bytes_, length_); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } private: jbyte *bytes_; diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h index 9e06aa6..8865dfe 100644 --- a/extensions/libarchive/CompressContent.h +++ b/extensions/libarchive/CompressContent.h @@ -180,11 +180,10 @@ public: int status_; static la_ssize_t archive_write(struct archive* /*arch*/, void *context, const void *buff, size_t size) { - WriteCallback *callback = (WriteCallback *) context; - la_ssize_t ret = callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), gsl::narrow<int>(size)); - if (ret > 0) - callback->size_ += (int64_t) ret; - return ret; + auto* const callback = static_cast<WriteCallback*>(context); + const auto ret = callback->stream_->write(reinterpret_cast<const uint8_t*>(buff), size); + if (!io::isError(ret)) callback->size_ += gsl::narrow<int64_t>(ret); + return io::isError(ret) ? -1 : gsl::narrow<la_ssize_t>(ret); } static la_ssize_t archive_read(struct archive* archive, void *context, const void **buff) { @@ -337,8 +336,8 @@ public: if (read_result == 0) break; size_ += read_result; - const auto write_result = stream_->write(reinterpret_cast<uint8_t*>(buffer), gsl::narrow<int>(read_result)); - if (write_result < 0) { + const auto write_result = stream_->write(reinterpret_cast<uint8_t*>(buffer), gsl::narrow<size_t>(read_result)); + if (io::isError(write_result)) { archive_read_log_error_cleanup(arch); return -1; } @@ -377,8 +376,8 @@ public: int64_t process(const std::shared_ptr<io::BaseStream>& inputStream) override { std::vector<uint8_t> buffer(16 * 1024U); - int64_t read_size = 0; - while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) { + size_t read_size = 0; + while (read_size < writer_.flow_->getSize()) { const auto ret = inputStream->read(buffer.data(), buffer.size()); if (io::isError(ret)) { return -1; @@ -393,7 +392,7 @@ public: } } outputStream_->close(); - return read_size; + return gsl::narrow<int64_t>(read_size); } GzipWriteCallback& writer_; diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h index ac83728..d14b924 100644 --- a/extensions/libarchive/MergeContent.h +++ b/extensions/libarchive/MergeContent.h @@ -64,7 +64,7 @@ class BinaryConcatenationMerge : public MergeBin { BinaryConcatenationMerge(const std::string& header, const std::string& footer, const std::string& demarcator); void merge(core::ProcessContext *context, core::ProcessSession *session, - std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile); + std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile) override; // Nest Callback Class for write stream class WriteCallback: public OutputStreamCallback { public: @@ -77,35 +77,35 @@ class BinaryConcatenationMerge : public MergeBin { std::string &demarcator_; std::deque<std::shared_ptr<core::FlowFile>> &flows_; FlowFileSerializer& serializer_; - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { - int64_t ret = 0; + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + size_t write_size_sum = 0; if (!header_.empty()) { - int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(header_.data())), gsl::narrow<int>(header_.size())); - if (len < 0) - return len; - ret += len; + const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(header_.data()), header_.size()); + if (io::isError(write_ret)) + return -1; + write_size_sum += write_ret; } bool isFirst = true; - for (auto flow : flows_) { + for (const auto& flow : flows_) { if (!isFirst && !demarcator_.empty()) { - int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(demarcator_.data())), gsl::narrow<int>(demarcator_.size())); - if (len < 0) - return len; - ret += len; + const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(demarcator_.data()), demarcator_.size()); + if (io::isError(write_ret)) + return -1; + write_size_sum += write_ret; } int len = serializer_.serialize(flow, stream); if (len < 0) return len; - ret += len; + write_size_sum += gsl::narrow<size_t>(len); isFirst = false; } if (!footer_.empty()) { - int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(footer_.data())), gsl::narrow<int>(footer_.size())); - if (len < 0) - return len; - ret += len; + const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(footer_.data()), footer_.size()); + if (io::isError(write_ret)) + return -1; + write_size_sum += write_ret; } - return ret; + return gsl::narrow<int64_t>(write_size_sum); } }; @@ -122,25 +122,26 @@ class ArchiveMerge { class ArchiveWriter : public io::OutputStream { public: ArchiveWriter(struct archive *arch, struct archive_entry *entry) : arch_(arch), entry_(entry) {} - int write(const uint8_t* data, int size) override { + size_t write(const uint8_t* data, size_t size) override { if (!header_emitted_) { if (archive_write_header(arch_, entry_) != ARCHIVE_OK) { - return -1; + return io::STREAM_ERROR; } header_emitted_ = true; } - int totalWrote = 0; - int remaining = size; + size_t totalWrote = 0; + size_t remaining = size; while (remaining > 0) { const auto ret = archive_write_data(arch_, data + totalWrote, remaining); if (ret < 0) { - return ret; + return io::STREAM_ERROR; } - if (ret == 0) { + const auto zret = gsl::narrow<size_t>(ret); + if (zret == 0) { break; } - totalWrote += ret; - remaining -= ret; + totalWrote += zret; + remaining -= zret; } return totalWrote; } @@ -161,7 +162,7 @@ class ArchiveMerge { size_ = 0; stream_ = nullptr; } - ~WriteCallback() = default; + ~WriteCallback() override = default; std::string merge_type_; std::deque<std::shared_ptr<core::FlowFile>> &flows_; @@ -174,10 +175,10 @@ class ArchiveMerge { WriteCallback *callback = (WriteCallback *) context; uint8_t* data = reinterpret_cast<uint8_t*>(const_cast<void*>(buff)); la_ssize_t totalWrote = 0; - int remaining = gsl::narrow<int>(size); + size_t remaining = size; while (remaining > 0) { - la_ssize_t ret = callback->stream_->write(data + totalWrote, remaining); - if (ret < 0) { + const auto ret = callback->stream_->write(data + totalWrote, remaining); + if (io::isError(ret)) { // libarchive expects us to return -1 on error return -1; } @@ -185,13 +186,13 @@ class ArchiveMerge { break; } callback->size_ += ret; - totalWrote += ret; + totalWrote += static_cast<la_ssize_t>(ret); remaining -= ret; } return totalWrote; } - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { struct archive *arch; arch = archive_write_new(); @@ -305,7 +306,7 @@ class MergeContent : public processors::BinFiles { attributeStrategy_ = merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON; } // Destructor - virtual ~MergeContent() = default; + ~MergeContent() override = default; // Processor Name static constexpr char const* ProcessorName = "MergeContent"; // Supported Properties diff --git a/extensions/libarchive/UnfocusArchiveEntry.cpp b/extensions/libarchive/UnfocusArchiveEntry.cpp index e1fb1f9..de54bc2 100644 --- a/extensions/libarchive/UnfocusArchiveEntry.cpp +++ b/extensions/libarchive/UnfocusArchiveEntry.cpp @@ -153,9 +153,9 @@ typedef struct { } UnfocusArchiveEntryWriteData; la_ssize_t UnfocusArchiveEntry::WriteCallback::write_cb(struct archive *, void *d, const void *buffer, size_t length) { - auto data = static_cast<UnfocusArchiveEntryWriteData *>(d); - const uint8_t *ui_buffer = static_cast<const uint8_t*>(buffer); - return data->stream->write(const_cast<uint8_t*>(ui_buffer), gsl::narrow<int>(length)); + auto* const data = static_cast<UnfocusArchiveEntryWriteData *>(d); + const auto write_ret = data->stream->write(static_cast<const uint8_t*>(buffer), length); + return io::isError(write_ret) ? -1 : gsl::narrow<la_ssize_t>(write_ret); } int64_t UnfocusArchiveEntry::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) { diff --git a/extensions/librdkafka/ConsumeKafka.cpp b/extensions/librdkafka/ConsumeKafka.cpp index 290466c..38f7f99 100644 --- a/extensions/librdkafka/ConsumeKafka.cpp +++ b/extensions/librdkafka/ConsumeKafka.cpp @@ -556,11 +556,10 @@ void ConsumeKafka::onTrigger(core::ProcessContext* /* context */, core::ProcessS } int64_t ConsumeKafka::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) { - int64_t ret = 0; - if (data_) { - ret = stream->write(data_, gsl::narrow<int>(dataSize_)); - } - return ret; + if (!data_) return 0; + const auto write_ret = stream->write(data_, dataSize_); + if (io::isError(write_ret)) return -1; + return gsl::narrow<int64_t>(write_ret); } } // namespace processors diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h index b3884fb..10208c6 100644 --- a/extensions/mqtt/processors/ConsumeMQTT.h +++ b/extensions/mqtt/processors/ConsumeMQTT.h @@ -32,6 +32,7 @@ #include "concurrentqueue.h" #include "MQTTClient.h" #include "AbstractMQTTProcessor.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -57,7 +58,7 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor { maxSegSize_ = ULLONG_MAX; } // Destructor - virtual ~ConsumeMQTT() { + ~ConsumeMQTT() override { MQTTClient_message *message; while (queue_.try_dequeue(message)) { MQTTClient_freeMessage(&message); @@ -74,18 +75,23 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor { // Nest Callback Class for write stream class WriteCallback : public OutputStreamCallback { public: - WriteCallback(MQTTClient_message *message) + explicit WriteCallback(MQTTClient_message *message) : message_(message) { - status_ = 0; } MQTTClient_message *message_; - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { - int64_t len = stream->write(reinterpret_cast<uint8_t*>(message_->payload), message_->payloadlen); - if (len < 0) + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + if (message_->payloadlen < 0) { status_ = -1; - return len; + return -1; + } + const auto len = stream->write(reinterpret_cast<uint8_t*>(message_->payload), gsl::narrow<size_t>(message_->payloadlen)); + if (io::isError(len)) { + status_ = -1; + return -1; + } + return gsl::narrow<int64_t>(len); } - int status_; + int status_ = 0; }; public: @@ -99,7 +105,7 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor { // OnTrigger method, implemented by NiFi ConsumeMQTT void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; // Initialize, over write by NiFi ConsumeMQTT - void initialize(void) override; + void initialize() override; bool enqueueReceiveMQTTMsg(MQTTClient_message *message) override; protected: diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h index 3a1d978..f4102c2 100644 --- a/extensions/opc/include/fetchopc.h +++ b/extensions/opc/include/fetchopc.h @@ -38,6 +38,7 @@ #include "controllers/SSLContextService.h" #include "core/logging/LoggerConfiguration.h" #include "utils/Id.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -83,7 +84,8 @@ protected: : data_(data) { } int64_t process(const std::shared_ptr<io::BaseStream>& stream) { - return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size()); + const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(data_.c_str()), data_.size()); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } }; std::string nodeID_; diff --git a/extensions/opencv/CaptureRTSPFrame.h b/extensions/opencv/CaptureRTSPFrame.h index 8d16a1e..0262478 100644 --- a/extensions/opencv/CaptureRTSPFrame.h +++ b/extensions/opencv/CaptureRTSPFrame.h @@ -19,13 +19,13 @@ #define NIFI_MINIFI_CPP_CAPTURERTSPFRAME_H #include <atomic> - -#include <core/Resource.h> -#include <core/Processor.h> -#include <opencv2/opencv.hpp> - #include <iomanip> #include <ctime> +#include <opencv2/opencv.hpp> + +#include "core/Resource.h" +#include "core/Processor.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -69,10 +69,9 @@ class CaptureRTSPFrame : public core::Processor { ~CaptureRTSPFrameWriteCallback() override = default; int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { - int64_t ret = 0; imencode(image_encoding_, image_mat_, image_buf_); - ret = stream->write(image_buf_.data(), image_buf_.size()); - return ret; + const auto ret = stream->write(image_buf_.data(), image_buf_.size()); + return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret); } private: diff --git a/extensions/opencv/FrameIO.h b/extensions/opencv/FrameIO.h index 73155a3..fa8d192 100644 --- a/extensions/opencv/FrameIO.h +++ b/extensions/opencv/FrameIO.h @@ -18,6 +18,8 @@ #ifndef NIFI_MINIFI_CPP_FRAMEIO_H #define NIFI_MINIFI_CPP_FRAMEIO_H +#include "utils/gsl.h" + namespace org { namespace apache { namespace nifi { @@ -33,10 +35,9 @@ class FrameWriteCallback : public OutputStreamCallback { ~FrameWriteCallback() override = default; int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { - int64_t ret = 0; imencode(image_encoding_, image_mat_, image_buf_); - ret = stream->write(image_buf_.data(), image_buf_.size()); - return ret; + const auto ret = stream->write(image_buf_.data(), image_buf_.size()); + return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret); } private: diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp index 39a1eff..ee93d01 100644 --- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp +++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp @@ -606,7 +606,8 @@ SourceInitiatedSubscriptionListener::Handler::WriteCallback::WriteCallback(char* } int64_t SourceInitiatedSubscriptionListener::Handler::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) { - return stream->write(reinterpret_cast<uint8_t*>(text_), strlen(text_)); + const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(text_), strlen(text_)); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } int SourceInitiatedSubscriptionListener::Handler::enumerateEventCallback(WsXmlNodeH node, void* data) { diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 54e42d4..b7e5ef9 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -87,8 +87,8 @@ void DatabaseContentRepository::Session::commit() { if (outStream == nullptr) { throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath()); } - const int size = gsl::narrow<int>(resource.second->size()); - if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) { + const auto size = resource.second->size(); + if (outStream->write(resource.second->getBuffer(), size) != size) { throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath()); } } @@ -97,8 +97,8 @@ void DatabaseContentRepository::Session::commit() { if (outStream == nullptr) { throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath()); } - const int size = gsl::narrow<int>(resource.second->size()); - if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) { + const auto size = resource.second->size(); + if (outStream->write(resource.second->getBuffer(), size) != size) { throw Exception(REPOSITORY_EXCEPTION, "Failed to append to resource: " + resource.first->getContentFullPath()); } } diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp index 735692b..4146a87 100644 --- a/extensions/rocksdb-repos/RocksDbStream.cpp +++ b/extensions/rocksdb-repos/RocksDbStream.cpp @@ -51,36 +51,28 @@ void RocksDbStream::seek(size_t /*offset*/) { // noop } -int RocksDbStream::write(const uint8_t *value, int size) { - gsl_Expects(size >= 0); - if (!write_enable_) { - return -1; +size_t RocksDbStream::write(const uint8_t *value, size_t size) { + if (!write_enable_) return STREAM_ERROR; + if (size == 0) return 0; + if (IsNullOrEmpty(value)) return STREAM_ERROR; + auto opendb = db_->open(); + if (!opendb) { + return STREAM_ERROR; } - if (size == 0) { - return 0; + rocksdb::Slice slice_value((const char*)value, size); + rocksdb::Status status; + size_ += size; + if (batch_ != nullptr) { + status = batch_->Merge(path_, slice_value); + } else { + rocksdb::WriteOptions opts; + opts.sync = true; + status = opendb->Merge(opts, path_, slice_value); } - if (!IsNullOrEmpty(value)) { - auto opendb = db_->open(); - if (!opendb) { - return -1; - } - rocksdb::Slice slice_value((const char *) value, size); - rocksdb::Status status; - size_ += size; - if (batch_ != nullptr) { - status = batch_->Merge(path_, slice_value); - } else { - rocksdb::WriteOptions opts; - opts.sync = true; - status = opendb->Merge(opts, path_, slice_value); - } - if (status.ok()) { - return size; - } else { - return -1; - } + if (status.ok()) { + return size; } else { - return -1; + return STREAM_ERROR; } } diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h index cf4946f..e526d8c 100644 --- a/extensions/rocksdb-repos/RocksDbStream.h +++ b/extensions/rocksdb-repos/RocksDbStream.h @@ -77,7 +77,7 @@ class RocksDbStream : public io::BaseStream { * @param value value to write * @param size size of value */ - int write(const uint8_t *value, int size) override; + size_t write(const uint8_t *value, size_t size) override; protected: std::string path_; diff --git a/extensions/script/lua/LuaBaseStream.cpp b/extensions/script/lua/LuaBaseStream.cpp index 3545c77..5e9d423 100644 --- a/extensions/script/lua/LuaBaseStream.cpp +++ b/extensions/script/lua/LuaBaseStream.cpp @@ -60,7 +60,7 @@ std::string LuaBaseStream::read(size_t len) { } size_t LuaBaseStream::write(std::string buf) { - return static_cast<size_t>(stream_->write(reinterpret_cast<uint8_t *>(const_cast<char *>(buf.data())), static_cast<int>(buf.length()))); + return stream_->write(reinterpret_cast<const uint8_t*>(buf.data()), buf.length()); } } /* namespace lua */ diff --git a/extensions/script/python/PyBaseStream.cpp b/extensions/script/python/PyBaseStream.cpp index 5e583f7..60259ed 100644 --- a/extensions/script/python/PyBaseStream.cpp +++ b/extensions/script/python/PyBaseStream.cpp @@ -54,9 +54,8 @@ py::bytes PyBaseStream::read(size_t len) { } size_t PyBaseStream::write(const py::bytes& buf) { - const auto &&buf_str = buf.operator std::string(); - return static_cast<size_t>(stream_->write(reinterpret_cast<uint8_t *>(const_cast<char *>(buf_str.data())), - static_cast<int>(buf_str.length()))); + auto buf_str = buf.operator std::string(); + return stream_->write(reinterpret_cast<const uint8_t*>(buf_str.data()), buf_str.length()); } } /* namespace python */ diff --git a/extensions/sensors/SensorBase.h b/extensions/sensors/SensorBase.h index 0bdeb52..9b064b4 100644 --- a/extensions/sensors/SensorBase.h +++ b/extensions/sensors/SensorBase.h @@ -48,13 +48,13 @@ class SensorBase : public core::Processor { /*! * Create a new processor */ - SensorBase(const std::string& name, const utils::Identifier& uuid = {}) + explicit SensorBase(const std::string& name, const utils::Identifier& uuid = {}) : Processor(name, uuid), imu(nullptr), logger_(logging::LoggerFactory<SensorBase>::getLogger()) { } // Destructor - virtual ~SensorBase(); + ~SensorBase() override; // Processor Name static core::Relationship Success; // Supported Properties @@ -65,17 +65,14 @@ class SensorBase : public core::Processor { class WriteCallback : public OutputStreamCallback { public: - WriteCallback(std::string data) - : _data(const_cast<char*>(data.data())), - _dataSize(data.size()) { - } - char *_data; - uint64_t _dataSize; - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { - int64_t ret = 0; - if (_data && _dataSize > 0) - ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize); - return ret; + explicit WriteCallback(std::string data) + : data_{std::move(data)} + {} + std::string data_; + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + if (data_.empty()) return 0; + const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(data_.data()), data_.size()); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } }; protected: diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp index 7af066d..cfa47ba 100644 --- a/extensions/sftp/client/SFTPClient.cpp +++ b/extensions/sftp/client/SFTPClient.cpp @@ -513,16 +513,16 @@ bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_ break; } logger_->log_trace("Read %d bytes from remote file \"%s\"", read_ret, path.c_str()); - total_read += read_ret; - int remaining = read_ret; + total_read += gsl::narrow<uint64_t>(read_ret); + auto remaining = read_ret; while (remaining > 0) { - int write_ret = output.write(buf.data() + (read_ret - remaining), remaining); - if (write_ret < 0) { + const auto write_ret = output.write(buf.data() + (read_ret - remaining), gsl::narrow<size_t>(remaining)); + if (io::isError(write_ret)) { last_error_.setLibssh2Error(LIBSSH2_FX_OK); logger_->log_error("Failed to write output"); return false; } - remaining -= write_ret; + remaining -= gsl::narrow<decltype(remaining)>(write_ret); } } while (true); diff --git a/extensions/standard-processors/processors/ExecuteProcess.h b/extensions/standard-processors/processors/ExecuteProcess.h index 51d72fd..c9e5b1b 100644 --- a/extensions/standard-processors/processors/ExecuteProcess.h +++ b/extensions/standard-processors/processors/ExecuteProcess.h @@ -42,6 +42,7 @@ #include "core/Resource.h" #include "FlowFileRecord.h" #include "io/BaseStream.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -92,11 +93,10 @@ class ExecuteProcess : public core::Processor { char *_data; uint64_t _dataSize; // void process(std::ofstream *stream) { - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { - int64_t ret = 0; - if (_data && _dataSize > 0) - ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize); - return ret; + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + if (!_data || _dataSize <= 0) return 0; + const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } }; diff --git a/extensions/standard-processors/processors/GenerateFlowFile.cpp b/extensions/standard-processors/processors/GenerateFlowFile.cpp index bf199b4..58cc738 100644 --- a/extensions/standard-processors/processors/GenerateFlowFile.cpp +++ b/extensions/standard-processors/processors/GenerateFlowFile.cpp @@ -148,7 +148,7 @@ void GenerateFlowFile::onTrigger(core::ProcessContext* /*context*/, core::Proces if (fileSize_ > 0) { generateData(data, textData_); } - GenerateFlowFile::WriteCallback callback(std::move(data)); + GenerateFlowFile::WriteCallback callback(data); session->write(flowFile, &callback); } else { GenerateFlowFile::WriteCallback callback(data_); diff --git a/extensions/standard-processors/processors/GenerateFlowFile.h b/extensions/standard-processors/processors/GenerateFlowFile.h index 3c346d3..37b2bbb 100644 --- a/extensions/standard-processors/processors/GenerateFlowFile.h +++ b/extensions/standard-processors/processors/GenerateFlowFile.h @@ -52,7 +52,7 @@ class GenerateFlowFile : public core::Processor { textData_ = false; } // Destructor - virtual ~GenerateFlowFile() = default; + ~GenerateFlowFile() override = default; // Processor Name static constexpr char const* ProcessorName = "GenerateFlowFile"; // Supported Properties @@ -67,16 +67,14 @@ class GenerateFlowFile : public core::Processor { // Nest Callback Class for write stream class WriteCallback : public OutputStreamCallback { public: - WriteCallback(std::vector<char> && data) : data_(std::move(data)) { // NOLINT - } - WriteCallback(const std::vector<char>& data) : data_(data) { // NOLINT - } - std::vector<char> data_; - int64_t process(const std::shared_ptr<io::BaseStream>& stream) { - int64_t ret = 0; - if (data_.size() > 0) - ret = stream->write(reinterpret_cast<uint8_t*>(&data_[0]), gsl::narrow<int>(data_.size())); - return ret; + explicit WriteCallback(const std::vector<char>& data) + :data_(&data) + { } + gsl::not_null<const std::vector<char>*> data_; + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { + if (data_->empty()) return 0; + const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(data_->data()), data_->size()); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } }; @@ -85,7 +83,7 @@ class GenerateFlowFile : public core::Processor { // OnTrigger method, implemented by NiFi GenerateFlowFile void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override; // Initialize, over write by NiFi GenerateFlowFile - void initialize(void) override; + void initialize() override; protected: std::vector<char> data_; diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h index c2a5b49..7595763 100644 --- a/extensions/standard-processors/processors/GetTCP.h +++ b/extensions/standard-processors/processors/GetTCP.h @@ -98,7 +98,8 @@ class DataHandlerCallback : public OutputStreamCallback { ~DataHandlerCallback() override = default; int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { - return stream->write(message_, gsl::narrow<int>(size_)); + const auto write_ret = stream->write(message_, size_); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } private: diff --git a/extensions/standard-processors/processors/ListenSyslog.h b/extensions/standard-processors/processors/ListenSyslog.h index 271f739..b986730 100644 --- a/extensions/standard-processors/processors/ListenSyslog.h +++ b/extensions/standard-processors/processors/ListenSyslog.h @@ -50,6 +50,7 @@ #include "core/ProcessSession.h" #include "core/Resource.h" #include "FlowFileRecord.h" +#include "utils/gsl.h" #ifndef WIN32 @@ -89,15 +90,14 @@ class ListenSyslog : public core::Processor { _serverSocket = 0; _maxFds = 0; FD_ZERO(&_readfds); - _thread = NULL; + _thread = nullptr; _resetServerSocket = false; _serverTheadRunning = false; } // Destructor - virtual ~ListenSyslog() { + ~ListenSyslog() override { _serverTheadRunning = false; - if (this->_thread) - delete this->_thread; + delete this->_thread; // need to reset the socket std::vector<int>::iterator it; for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) { @@ -135,10 +135,9 @@ class ListenSyslog : public core::Processor { uint8_t *_data; uint64_t _dataSize; int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { - int64_t ret = 0; - if (_data && _dataSize > 0) - ret = stream->write(_data, _dataSize); - return ret; + if (!_data || _dataSize <= 0) return 0; + const auto write_ret = stream->write(_data, _dataSize); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } }; diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp index 26ea193..6d4bf78 100644 --- a/extensions/standard-processors/processors/TailFile.cpp +++ b/extensions/standard-processors/processors/TailFile.cpp @@ -214,7 +214,7 @@ class FileReaderCallback : public OutputStreamCallback { while (hasMoreToRead() && !found_delimiter) { if (begin_ == end_) { - input_stream_.read(reinterpret_cast<char *>(buffer_.data()), buffer_.size()); + input_stream_.read(reinterpret_cast<char *>(buffer_.data()), gsl::narrow<std::streamsize>(buffer_.size())); const auto num_bytes_read = input_stream_.gcount(); logger_->log_trace("Read %jd bytes of input", std::intmax_t{num_bytes_read}); @@ -226,15 +226,10 @@ class FileReaderCallback : public OutputStreamCallback { char *delimiter_pos = std::find(begin_, end_, input_delimiter_); found_delimiter = (delimiter_pos != end_); - ptrdiff_t zlen{std::distance(begin_, delimiter_pos)}; - if (found_delimiter) { - zlen += 1; - } - const int len = gsl::narrow<int>(zlen); - - crc_stream.write(reinterpret_cast<uint8_t*>(begin_), len); - num_bytes_written += len; - begin_ += len; + const auto zlen = gsl::narrow<size_t>(std::distance(begin_, delimiter_pos)) + (found_delimiter ? 1 : 0); + crc_stream.write(reinterpret_cast<uint8_t*>(begin_), zlen); + num_bytes_written += zlen; + begin_ += zlen; } if (found_delimiter) { diff --git a/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp b/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp index da29f4b..4f221d9 100644 --- a/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp +++ b/extensions/standard-processors/tests/integration/SecureSocketGetTCPTest.cpp @@ -92,7 +92,7 @@ class SecureSocketTest : public IntegrationBase { std::shared_ptr<minifi::processors::GetTCP> inv = std::dynamic_pointer_cast<minifi::processors::GetTCP>(proc); assert(inv != nullptr); - std::string url = ""; + std::string url; configuration->set("nifi.remote.input.secure", "true"); std::string path = key_dir + "cn.crt.pem"; configuration->set("nifi.security.client.certificate", path); @@ -117,23 +117,15 @@ class SecureSocketTest : public IntegrationBase { assert(0 == server_socket_->initialize()); isRunning_ = true; - check = [this]() -> bool { - return isRunning_; + auto handler = [](std::vector<uint8_t> *b) { + *b = {'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd', 0, 0, 0, 0, 0, 0, 0, 0, 0}; + assert(b->size() == 20); + return b->size(); }; - handler = [](std::vector<uint8_t> *b, int *size) { - std::cout << "oh write!" << std::endl; - b->reserve(20); - memset(b->data(), 0x00, 20); - memcpy(b->data(), "hello world", 11); - *size = 20; - return *size; - }; - server_socket_->registerCallback(check, handler, std::chrono::milliseconds(50)); + server_socket_->registerCallback([this] { return isRunning_.load(); }, std::move(handler), std::chrono::milliseconds(50)); } protected: - std::function<bool()> check; - std::function<int(std::vector<uint8_t>*b, int *size)> handler; bool isSecure; std::atomic<bool> isRunning_; std::string dir; diff --git a/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp b/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp index 9278b9d..55800b0 100644 --- a/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp +++ b/extensions/standard-processors/tests/integration/TLSServerSocketSupportedProtocolsTest.cpp @@ -248,17 +248,13 @@ class TLSServerSocketSupportedProtocolsTest { assert(0 == server_socket_->initialize()); is_running_ = true; - auto check = [this]() -> bool { - return is_running_; + auto handler = [](std::vector<uint8_t> *bytes_written) { + const char contents[] = "hello world"; + *bytes_written = {std::begin(contents), std::end(contents)}; + assert(12 == bytes_written->size()); + return bytes_written->size(); }; - auto handler = [](std::vector<uint8_t> *bytes_written, int *size) { - std::string contents = "hello world"; - *bytes_written = {contents.begin(), contents.end()}; - bytes_written->push_back(0); - *size = bytes_written->size(); - return *size; - }; - server_socket_->registerCallback(check, handler, std::chrono::milliseconds(50)); + server_socket_->registerCallback([this]{ return is_running_.load(); }, std::move(handler), std::chrono::milliseconds(50)); } void verifyTLSServerSocketExclusiveCompatibilityWithTLSv1_2() { diff --git a/extensions/standard-processors/tests/unit/GetTCPTests.cpp b/extensions/standard-processors/tests/unit/GetTCPTests.cpp index 91ca60e..859ecbf 100644 --- a/extensions/standard-processors/tests/unit/GetTCPTests.cpp +++ b/extensions/standard-processors/tests/unit/GetTCPTests.cpp @@ -110,7 +110,7 @@ TEST_CASE("GetTCPWithoutEOM", "[GetTCP1]") { std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); processor->onSchedule(context, factory); processor->onTrigger(context, session); - server.write(buffer, gsl::narrow<int>(buffer.size())); + server.write(buffer, buffer.size()); std::this_thread::sleep_for(std::chrono::seconds(2)); logAttribute->initialize(); @@ -225,7 +225,7 @@ TEST_CASE("GetTCPWithOEM", "[GetTCP2]") { std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); processor->onSchedule(context, factory); processor->onTrigger(context, session); - server.write(buffer, gsl::narrow<int>(buffer.size())); + server.write(buffer, buffer.size()); std::this_thread::sleep_for(std::chrono::seconds(2)); logAttribute->initialize(); @@ -349,7 +349,7 @@ TEST_CASE("GetTCPWithOnlyOEM", "[GetTCP3]") { std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); processor->onSchedule(context, factory); processor->onTrigger(context, session); - server.write(buffer, gsl::narrow<int>(buffer.size())); + server.write(buffer, buffer.size()); std::this_thread::sleep_for(std::chrono::seconds(2)); logAttribute->initialize(); diff --git a/extensions/usb-camera/CMakeLists.txt b/extensions/usb-camera/CMakeLists.txt index 14c1c08..87e8c02 100644 --- a/extensions/usb-camera/CMakeLists.txt +++ b/extensions/usb-camera/CMakeLists.txt @@ -23,7 +23,7 @@ find_package(PNG) if(PNG_FOUND) set(PNG_LINK_FLAGS ${PNG_LIBRARIES}) else() - pkg_check_modules(PNG QUIET libpng) + pkg_check_modules(PNG libpng) if(PNG_FOUND) set(PNG_INCLUDE_DIR ${PNG_INCLUDE_DIRS}) set(PNG_LINK_FLAGS ${PNG_LDFLAGS}) diff --git a/extensions/usb-camera/GetUSBCamera.cpp b/extensions/usb-camera/GetUSBCamera.cpp index 074341f..4d6d021 100644 --- a/extensions/usb-camera/GetUSBCamera.cpp +++ b/extensions/usb-camera/GetUSBCamera.cpp @@ -17,6 +17,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "GetUSBCamera.h" + #include <png.h> #include <utility> @@ -25,7 +27,7 @@ #include <set> #include <algorithm> -#include "GetUSBCamera.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -460,7 +462,8 @@ int64_t GetUSBCamera::PNGWriteCallback::process(const std::shared_ptr<io::BaseSt throw; } - return stream->write(png_output_buf_.data(), png_output_buf_.size()); + const auto write_ret = stream->write(png_output_buf_.data(), png_output_buf_.size()); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } GetUSBCamera::RawWriteCallback::RawWriteCallback(uvc_frame_t *frame) @@ -470,7 +473,8 @@ GetUSBCamera::RawWriteCallback::RawWriteCallback(uvc_frame_t *frame) int64_t GetUSBCamera::RawWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) { logger_->log_info("Writing %d bytes of raw capture data", frame_->data_bytes); - return stream->write(reinterpret_cast<uint8_t *>(frame_->data), frame_->data_bytes); + const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(frame_->data), frame_->data_bytes); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } } /* namespace processors */ diff --git a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp index b1d7201..4c74552 100644 --- a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp +++ b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp @@ -625,16 +625,15 @@ void CollectorInitiatedSubscription::unsubscribe() { int CollectorInitiatedSubscription::processQueue(const std::shared_ptr<core::ProcessSession> &session) { struct WriteCallback: public OutputStreamCallback { explicit WriteCallback(const std::string& str) - : str_(str) { - status_ = 0; + : str_(&str) { } int64_t process(const std::shared_ptr<io::BaseStream>& stream) { - return stream->write(reinterpret_cast<uint8_t*>(&str_[0]), gsl::narrow<int>(str_.size())); + const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(str_->data()), str_->size()); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } - std::string str_; - int status_; + gsl::not_null<const std::string*> str_; }; int flowFileCount = 0; diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp index 3746826..04a90a0 100644 --- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp +++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp @@ -695,7 +695,8 @@ void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender& } int64_t process(const std::shared_ptr<io::BaseStream>& stream) { - return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(str_.c_str())), gsl::narrow<int>(str_.size())); + const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(str_.c_str()), str_.size()); + return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret); } const std::string& str_; diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h index 6456ff2..10b2eda 100644 --- a/libminifi/include/io/AtomicEntryStream.h +++ b/libminifi/include/io/AtomicEntryStream.h @@ -82,7 +82,7 @@ class AtomicEntryStream : public BaseStream { * @param value value to write * @param size size of value */ - int write(const uint8_t *value, int size) override; + size_t write(const uint8_t *value, size_t size) override; private: size_t length_; @@ -110,22 +110,18 @@ void AtomicEntryStream<T>::seek(size_t offset) { // data stream overrides template<typename T> -int AtomicEntryStream<T>::write(const uint8_t *value, int size) { - gsl_Expects(size >= 0); - if (size == 0) { - return 0; - } - if (nullptr != value && !invalid_stream_) { - std::lock_guard<std::recursive_mutex> lock(entry_lock_); - if (entry_->insert(key_, const_cast<uint8_t*>(value), size)) { - offset_ += size; - if (offset_ > length_) { - length_ = offset_; - } - return size; +size_t AtomicEntryStream<T>::write(const uint8_t *value, size_t size) { + if (size == 0) return 0; + if (!value || invalid_stream_) return STREAM_ERROR; + std::lock_guard<std::recursive_mutex> lock(entry_lock_); + if (entry_->insert(key_, const_cast<uint8_t*>(value), size)) { + offset_ += size; + if (offset_ > length_) { + length_ = offset_; } + return size; } - return -1; + return STREAM_ERROR; } template<typename T> diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h index 7ab2b9e..2290a3f 100644 --- a/libminifi/include/io/BufferStream.h +++ b/libminifi/include/io/BufferStream.h @@ -40,13 +40,13 @@ class BufferStream : public BaseStream { } explicit BufferStream(const std::string& data) { - write(reinterpret_cast<const uint8_t*>(data.c_str()), gsl::narrow<int>(data.length())); + write(reinterpret_cast<const uint8_t*>(data.c_str()), data.length()); } using BaseStream::read; using BaseStream::write; - int write(const uint8_t* data, int len) final; + size_t write(const uint8_t* data, size_t len) final; size_t read(uint8_t* buffer, size_t len) override; diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h index b6ae751..70949a4 100644 --- a/libminifi/include/io/CRCStream.h +++ b/libminifi/include/io/CRCStream.h @@ -104,9 +104,9 @@ class OutputCRCStream : public virtual CRCStreamBase<StreamType>, public OutputS public: using OutputStream::write; - int write(const uint8_t *value, int size) override { - int ret = child_stream_->write(value, size); - if (ret > 0) { + size_t write(const uint8_t *value, size_t size) override { + const auto ret = child_stream_->write(value, size); + if (ret > 0 && !io::isError(ret)) { crc_ = crc32(crc_, value, ret); } return ret; diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h index db71cba..e5f72b3 100644 --- a/libminifi/include/io/ClientSocket.h +++ b/libminifi/include/io/ClientSocket.h @@ -151,7 +151,7 @@ class Socket : public BaseStream { using BaseStream::write; using BaseStream::read; - int write(const uint8_t *value, int size) override; + size_t write(const uint8_t *value, size_t size) override; /** * Reads data and places it into buf diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h index ee5aec7..695f543 100644 --- a/libminifi/include/io/DescriptorStream.h +++ b/libminifi/include/io/DescriptorStream.h @@ -66,7 +66,7 @@ class DescriptorStream : public io::BaseStream { * @param value value to write * @param size size of value */ - int write(const uint8_t *value, int size) override; + size_t write(const uint8_t *value, size_t size) override; private: std::recursive_mutex file_lock_; diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h index ad7f0dc..3ef4db7 100644 --- a/libminifi/include/io/FileStream.h +++ b/libminifi/include/io/FileStream.h @@ -84,7 +84,7 @@ class FileStream : public io::BaseStream { * @param value value to write * @param size size of value */ - int write(const uint8_t *value, int size) override; + size_t write(const uint8_t *value, size_t size) override; private: void seekToEndOfFile(const char* caller_error_msg); diff --git a/libminifi/include/io/OutputStream.h b/libminifi/include/io/OutputStream.h index f14a0b3..b02c0ca 100644 --- a/libminifi/include/io/OutputStream.h +++ b/libminifi/include/io/OutputStream.h @@ -45,40 +45,52 @@ class OutputStream : public virtual Stream { * @param len length of value * @return resulting write size **/ - virtual int write(const uint8_t *value, int len) = 0; + virtual size_t write(const uint8_t *value, size_t len) = 0; - int write(const std::vector<uint8_t>& buffer, int len); + /** + * write: resolve nullptr ambiguity + * call: write(nullptr, 0) + * candidate1: write(const uint8_t*, size_t): conversion from std::nullptr_t to const uint8_t* and from int to size_t + * candidate2: write(const char*, bool): conversion from std::nullptr_t to const char* and from int to bool + * @param len + * @return + */ + size_t write(std::nullptr_t, size_t len) { + return write(static_cast<const uint8_t*>(nullptr), len); + } + + size_t write(const std::vector<uint8_t>& buffer, size_t len); /** * write bool to stream * @param value non encoded value * @return resulting write size **/ - int write(bool value); + size_t write(bool value); /** * write Identifier to stream * @param value non encoded value * @return resulting write size **/ - int write(const utils::Identifier& value); + size_t write(const utils::Identifier& value); /** * write string to stream * @param str string to write * @return resulting write size **/ - int write(const std::string& str, bool widen = false); + size_t write(const std::string& str, bool widen = false); /** * write string to stream * @param str string to write * @return resulting write size **/ - int write(const char* str, bool widen = false); + size_t write(const char* str, bool widen = false); template<size_t N> - int write(const utils::SmallString<N>& str, bool widen = false) { + size_t write(const utils::SmallString<N>& str, bool widen = false) { return write(str.c_str(), widen); } @@ -88,7 +100,7 @@ class OutputStream : public virtual Stream { * @return resulting write size **/ template<typename Integral, typename = std::enable_if<std::is_unsigned<Integral>::value && !std::is_same<Integral, bool>::value>> - int write(Integral value) { + size_t write(Integral value) { uint8_t buffer[sizeof(Integral)]{}; for (std::size_t byteIdx = 0; byteIdx < sizeof(Integral); ++byteIdx) { @@ -99,7 +111,7 @@ class OutputStream : public virtual Stream { } private: - int write_str(const char* str, uint32_t len, bool widen); + size_t write_str(const char* str, uint32_t len, bool widen); }; } // namespace io diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h index e694c2e..0428cf7 100644 --- a/libminifi/include/io/Stream.h +++ b/libminifi/include/io/Stream.h @@ -26,13 +26,9 @@ namespace io { constexpr size_t STREAM_ERROR = static_cast<size_t>(-1); -inline bool isError(const size_t read_return) noexcept { - return read_return == STREAM_ERROR // general error - || read_return == static_cast<size_t>(-2); // Socket EAGAIN, to be refactored to eliminate this error condition -} - -inline bool isError(const int write_return) noexcept { - return write_return == -1; +inline bool isError(const size_t read_write_return) noexcept { + return read_write_return == STREAM_ERROR // general error + || read_write_return == static_cast<size_t>(-2); // read: Socket EAGAIN, to be refactored to eliminate this error condition } /** diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h index abac015..b92b993 100644 --- a/libminifi/include/io/StreamPipe.h +++ b/libminifi/include/io/StreamPipe.h @@ -55,14 +55,14 @@ inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shar auto remaining = readRet; int transferred = 0; while (remaining > 0) { - int writeRet = dst->write(buffer + transferred, remaining); + const auto writeRet = dst->write(buffer + transferred, remaining); // TODO(adebreceni): // write might return 0, e.g. in case of a congested server // what should we return then? // - the number of bytes read or // - the number of bytes wrote - if (writeRet < 0) { - return writeRet; + if (io::isError(writeRet)) { + return -1; } transferred += writeRet; remaining -= writeRet; diff --git a/libminifi/include/io/ZlibStream.h b/libminifi/include/io/ZlibStream.h index ab4d419..b146413 100644 --- a/libminifi/include/io/ZlibStream.h +++ b/libminifi/include/io/ZlibStream.h @@ -76,7 +76,7 @@ class ZlibCompressStream : public ZlibBaseStream { ~ZlibCompressStream() override; - int write(const uint8_t* value, int size) override; + size_t write(const uint8_t* value, size_t size) override; void close() override; @@ -95,7 +95,7 @@ class ZlibDecompressStream : public ZlibBaseStream { ~ZlibDecompressStream() override; - int write(const uint8_t *value, int size) override; + size_t write(const uint8_t *value, size_t size) override; private: std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<ZlibDecompressStream>::getLogger()}; diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h b/libminifi/include/io/tls/SecureDescriptorStream.h index 5f285c1..2953711 100644 --- a/libminifi/include/io/tls/SecureDescriptorStream.h +++ b/libminifi/include/io/tls/SecureDescriptorStream.h @@ -81,7 +81,7 @@ class SecureDescriptorStream : public io::BaseStream { * @param value value to write * @param size size of value */ - int write(const uint8_t *value, int size) override; + size_t write(const uint8_t *value, size_t size) override; protected: std::recursive_mutex file_lock_; diff --git a/libminifi/include/io/tls/TLSServerSocket.h b/libminifi/include/io/tls/TLSServerSocket.h index 13eaf0c..53eb3b1 100644 --- a/libminifi/include/io/tls/TLSServerSocket.h +++ b/libminifi/include/io/tls/TLSServerSocket.h @@ -54,9 +54,9 @@ class TLSServerSocket : public BaseServerSocket, public TLSSocket { * Registers a call back and starts the read for the server socket. */ void registerCallback( - std::function<bool()> accept_function, - std::function<int(std::vector<uint8_t>*, int *)> handler, - std::chrono::milliseconds timeout = std::chrono::milliseconds(3000)); + std::function<bool()> accept_function, + std::function<size_t(std::vector<uint8_t>*)> handler, + std::chrono::milliseconds timeout = std::chrono::milliseconds(3000)); /** * Initializes the socket @@ -65,8 +65,6 @@ class TLSServerSocket : public BaseServerSocket, public TLSSocket { void registerCallback(std::function<bool()> accept_function, std::function<void(io::BaseStream *)> handler) override; private: - std::function<void(std::function<bool()> accept_function, std::function<int(std::vector<uint8_t>*, int *)> handler, std::chrono::milliseconds timeout)> fx; - void close_fd(int fd); std::atomic<bool> running_; diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h index 24df1d9..271de46 100644 --- a/libminifi/include/io/tls/TLSSocket.h +++ b/libminifi/include/io/tls/TLSSocket.h @@ -151,12 +151,12 @@ class TLSSocket : public Socket { * @param buflen buffer to write * */ - int write(const uint8_t *value, int size) override; + size_t write(const uint8_t *value, size_t size) override; void close() override; protected: - int writeData(const uint8_t *value, unsigned int size, int fd); + size_t writeData(const uint8_t *value, size_t size, int fd); SSL *get_ssl(int fd) { if (UNLIKELY(listeners_ > 0)) { diff --git a/libminifi/include/serialization/FlowFileV3Serializer.h b/libminifi/include/serialization/FlowFileV3Serializer.h index a6eb2cf..06a284c 100644 --- a/libminifi/include/serialization/FlowFileV3Serializer.h +++ b/libminifi/include/serialization/FlowFileV3Serializer.h @@ -34,9 +34,9 @@ class FlowFileV3Serializer : public FlowFileSerializer { static constexpr uint16_t MAX_2_BYTE_VALUE = (std::numeric_limits<uint16_t>::max)(); - static int writeLength(std::size_t length, const std::shared_ptr<io::OutputStream>& out); + static size_t writeLength(std::size_t length, const std::shared_ptr<io::OutputStream>& out); - static int writeString(const std::string& str, const std::shared_ptr<io::OutputStream>& out); + static size_t writeString(const std::string& str, const std::shared_ptr<io::OutputStream>& out); public: using FlowFileSerializer::FlowFileSerializer; diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h index d2138ea..cea5e88 100644 --- a/libminifi/include/sitetosite/Peer.h +++ b/libminifi/include/sitetosite/Peer.h @@ -295,7 +295,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { using BaseStream::write; using BaseStream::read; - int write(const uint8_t* data, int len) override { + size_t write(const uint8_t* data, size_t len) override { return stream_->write(data, len); } diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h index d142bfa..cffda6c 100644 --- a/libminifi/include/sitetosite/SiteToSiteClient.h +++ b/libminifi/include/sitetosite/SiteToSiteClient.h @@ -308,7 +308,7 @@ class ReadCallback : public InputStreamCallback { if (readSize == 0) break; if (io::isError(readSize)) return -1; const auto ret = _packet->transaction_->getStream().write(buffer, readSize); - if (ret < 0 || gsl::narrow<size_t>(ret) != readSize) { + if (io::isError(ret) || gsl::narrow<size_t>(ret) != readSize) { logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret; return -1; } diff --git a/libminifi/include/utils/GeneralUtils.h b/libminifi/include/utils/GeneralUtils.h index bd70412..6a766c9 100644 --- a/libminifi/include/utils/GeneralUtils.h +++ b/libminifi/include/utils/GeneralUtils.h @@ -19,6 +19,7 @@ #ifndef LIBMINIFI_INCLUDE_UTILS_GENERALUTILS_H_ #define LIBMINIFI_INCLUDE_UTILS_GENERALUTILS_H_ +#include <algorithm> #include <memory> #include <type_traits> #include <utility> @@ -174,6 +175,7 @@ auto invoke(F&& f, Args&&... args) MINIFICPP_UTIL_DEDUCED(detail::invoke_impl(st using std::invoke #endif /* < C++17 */ + namespace detail { struct dereference_t { template<typename T> diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index 89096a9..db080de 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -70,69 +70,82 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(const std::string& k } bool FlowFileRecord::Serialize(io::OutputStream &outStream) { - int ret; - - ret = outStream.write(event_time_); - if (ret != 8) { - return false; + { + const auto ret = outStream.write(event_time_); + if (ret != 8) { + return false; + } } - - ret = outStream.write(entry_date_); - if (ret != 8) { - return false; + { + const auto ret = outStream.write(entry_date_); + if (ret != 8) { + return false; + } } - - ret = outStream.write(lineage_start_date_); - if (ret != 8) { - return false; + { + const auto ret = outStream.write(lineage_start_date_); + if (ret != 8) { + return false; + } } - - ret = outStream.write(uuid_); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(uuid_); + if (ret == 0 || io::isError(ret)) { + return false; + } } - utils::Identifier containerId; if (connection_) { containerId = connection_->getUUID(); } - ret = outStream.write(containerId); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(containerId); + if (ret == 0 || io::isError(ret)) { + return false; + } } // write flow attributes - uint32_t numAttributes = gsl::narrow<uint32_t>(attributes_.size()); - ret = outStream.write(numAttributes); - if (ret != 4) { - return false; + { + const auto numAttributes = gsl::narrow<uint32_t>(attributes_.size()); + const auto ret = outStream.write(numAttributes); + if (ret != 4) { + return false; + } } for (auto& itAttribute : attributes_) { - ret = outStream.write(itAttribute.first, true); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(itAttribute.first, true); + if (ret == 0 || io::isError(ret)) { + return false; + } } - ret = outStream.write(itAttribute.second, true); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(itAttribute.second, true); + if (ret == 0 || io::isError(ret)) { + return false; + } } } - ret = outStream.write(getContentFullPath()); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(getContentFullPath()); + if (ret == 0 || io::isError(ret)) { + return false; + } } - - ret = outStream.write(size_); - if (ret != 8) { - return false; + { + const auto ret = outStream.write(size_); + if (ret != 8) { + return false; + } } - - ret = outStream.write(offset_); - if (ret != 8) { - return false; + { + const auto ret = outStream.write(offset_); + if (ret != 8) { + return false; + } } - return true; } diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp index 049a3d0..63e4b0e 100644 --- a/libminifi/src/c2/ControllerSocketProtocol.cpp +++ b/libminifi/src/c2/ControllerSocketProtocol.cpp @@ -177,7 +177,7 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro io::BufferStream resp; resp.write(&head, 1); resp.write(response.str()); - stream->write(const_cast<uint8_t*>(resp.getBuffer()), gsl::narrow<int>(resp.size())); + stream->write(resp.getBuffer(), resp.size()); } else if (what == "components") { io::BufferStream resp; resp.write(&head, 1); @@ -187,7 +187,7 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro resp.write(component->getComponentName()); resp.write(component->isRunning() ? "true" : "false"); } - stream->write(const_cast<uint8_t*>(resp.getBuffer()), gsl::narrow<int>(resp.size())); + stream->write(resp.getBuffer(), resp.size()); } else if (what == "jstack") { io::BufferStream resp; resp.write(&head, 1); @@ -203,7 +203,7 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro resp.write(line); } } - stream->write(const_cast<uint8_t*>(resp.getBuffer()), gsl::narrow<int>(resp.size())); + stream->write(resp.getBuffer(), resp.size()); } else if (what == "connections") { io::BufferStream resp; resp.write(&head, 1); @@ -212,7 +212,7 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro for (const auto &connection : queue_full_) { resp.write(connection.first, false); } - stream->write(const_cast<uint8_t*>(resp.getBuffer()), gsl::narrow<int>(resp.size())); + stream->write(resp.getBuffer(), resp.size()); } else if (what == "getfull") { std::vector<std::string> full_connections; { @@ -230,7 +230,7 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro for (const auto& conn : full_connections) { resp.write(conn); } - stream->write(const_cast<uint8_t*>(resp.getBuffer()), gsl::narrow<int>(resp.size())); + stream->write(resp.getBuffer(), resp.size()); } } break; diff --git a/libminifi/src/core/ContentSession.cpp b/libminifi/src/core/ContentSession.cpp index 14221ca..74ffcff 100644 --- a/libminifi/src/core/ContentSession.cpp +++ b/libminifi/src/core/ContentSession.cpp @@ -72,8 +72,8 @@ void ContentSession::commit() { if (outStream == nullptr) { throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath()); } - const int size = gsl::narrow<int>(resource.second->size()); - const int bytes_written = outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size); + const auto size = resource.second->size(); + const auto bytes_written = outStream->write(resource.second->getBuffer(), size); if (bytes_written != size) { throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath()); } @@ -83,8 +83,8 @@ void ContentSession::commit() { if (outStream == nullptr) { throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath()); } - const int size = gsl::narrow<int>(resource.second->size()); - const int bytes_written = outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size); + const auto size = resource.second->size(); + const auto bytes_written = outStream->write(resource.second->getBuffer(), size); if (bytes_written != size) { throw Exception(REPOSITORY_EXCEPTION, "Failed to append to resource: " + resource.first->getContentFullPath()); } diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 5e3454d..887798e 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -407,12 +407,12 @@ void ProcessSession::import(std::string source, const std::shared_ptr<FlowFile> while (input.good()) { input.read(reinterpret_cast<char*>(charBuffer.data()), size); if (input) { - if (stream->write(charBuffer.data(), gsl::narrow<int>(size)) < 0) { + if (io::isError(stream->write(charBuffer.data(), size))) { invalidWrite = true; break; } } else { - if (stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), gsl::narrow<int>(input.gcount())) < 0) { + if (io::isError(stream->write(reinterpret_cast<uint8_t*>(charBuffer.data()), gsl::narrow<size_t>(input.gcount())))) { invalidWrite = true; break; } @@ -489,7 +489,7 @@ void ProcessSession::import(const std::string& source, std::vector<std::shared_p while (true) { startTime = utils::timeutils::getTimeMillis(); uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter)); - const auto len = gsl::narrow<int>(delimiterPos - begin); + const auto len = gsl::narrow<size_t>(delimiterPos - begin); logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end); /* diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp index bd0a9d2..7e878d2 100644 --- a/libminifi/src/io/BufferStream.cpp +++ b/libminifi/src/io/BufferStream.cpp @@ -28,8 +28,7 @@ namespace nifi { namespace minifi { namespace io { -int BufferStream::write(const uint8_t *value, int size) { - gsl_Expects(size >= 0); +size_t BufferStream::write(const uint8_t *value, size_t size) { size_t originalSize = buffer_.size(); buffer_.resize(originalSize + size); std::memcpy(buffer_.data() + originalSize, value, size); diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index 57c6915..d26a01b 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -501,25 +501,22 @@ std::string Socket::getHostname() const { // data stream overrides -int Socket::write(const uint8_t *value, int size) { - gsl_Expects(size >= 0); - - int ret = 0, bytes = 0; - +size_t Socket::write(const uint8_t *value, size_t size) { + size_t bytes = 0; int fd = select_descriptor(1000); - if (fd < 0) { return -1; } + if (fd < 0) { return STREAM_ERROR; } while (bytes < size) { - ret = send(fd, reinterpret_cast<const char*>(value) + bytes, size - bytes, 0); + const auto send_ret = send(fd, reinterpret_cast<const char*>(value) + bytes, size - bytes, 0); // check for errors - if (ret <= 0) { + if (send_ret <= 0) { utils::file::FileUtils::close(fd); logger_->log_error("Could not send to %d, error: %s", fd, get_last_socket_error_message()); - return ret; + return STREAM_ERROR; } - bytes += ret; + bytes += gsl::narrow<size_t>(send_ret); } - if (ret) + if (bytes > 0) logger_->log_trace("Send data size %d over socket %d", size, fd); total_written_ += bytes; return bytes; diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp index a3a4ee8..64c0840 100644 --- a/libminifi/src/io/DescriptorStream.cpp +++ b/libminifi/src/io/DescriptorStream.cpp @@ -49,24 +49,18 @@ void DescriptorStream::seek(size_t offset) { #endif } -int DescriptorStream::write(const uint8_t *value, int size) { - gsl_Expects(size >= 0); - if (size == 0) { - return 0; - } - if (!IsNullOrEmpty(value)) { - std::lock_guard<std::recursive_mutex> lock(file_lock_); +size_t DescriptorStream::write(const uint8_t *value, size_t size) { + if (size == 0) return 0; + if (IsNullOrEmpty(value)) return STREAM_ERROR; + std::lock_guard<std::recursive_mutex> lock(file_lock_); #ifdef WIN32 - if (_write(fd_, value, size) != size) { + if (static_cast<size_t>(_write(fd_, value, size)) != size) { #else - if (::write(fd_, value, size) != size) { + if (static_cast<size_t>(::write(fd_, value, size)) != size) { #endif - return -1; - } else { - return size; - } + return STREAM_ERROR; } else { - return -1; + return size; } } diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp index 90fb0a2..c5470b4 100644 --- a/libminifi/src/io/FileStream.cpp +++ b/libminifi/src/io/FileStream.cpp @@ -113,35 +113,30 @@ void FileStream::seek(size_t offset) { logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << SEEKP_CALL_ERROR_MSG; } -int FileStream::write(const uint8_t *value, int size) { - gsl_Expects(size >= 0); - if (size == 0) { - return 0; - } - if (!IsNullOrEmpty(value)) { - std::lock_guard<std::mutex> lock(file_lock_); - if (file_stream_ == nullptr || !file_stream_->is_open()) { - logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG; - return -1; - } - if (file_stream_->write(reinterpret_cast<const char*>(value), size)) { - offset_ += size; - if (offset_ > length_) { - length_ = offset_; - } - if (!file_stream_->flush()) { - logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << FLUSH_CALL_ERROR_MSG; - return -1; - } - return size; - } else { - logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << WRITE_CALL_ERROR_MSG; - return -1; - } - } else { +size_t FileStream::write(const uint8_t *value, size_t size) { + if (size == 0) return 0; + if (IsNullOrEmpty(value)) { logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << EMPTY_MESSAGE_ERROR_MSG; - return -1; + return STREAM_ERROR; + } + std::lock_guard<std::mutex> lock(file_lock_); + if (file_stream_ == nullptr || !file_stream_->is_open()) { + logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG; + return STREAM_ERROR; + } + if (!file_stream_->write(reinterpret_cast<const char*>(value), gsl::narrow<std::streamsize>(size))) { + logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << WRITE_CALL_ERROR_MSG; + return STREAM_ERROR; + } + offset_ += size; + if (offset_ > length_) { + length_ = offset_; + } + if (!file_stream_->flush()) { + logging::LOG_ERROR(logger_) << WRITE_ERROR_MSG << FLUSH_CALL_ERROR_MSG; + return STREAM_ERROR; } + return size; } size_t FileStream::read(uint8_t *buf, size_t buflen) { diff --git a/libminifi/src/io/OutputStream.cpp b/libminifi/src/io/OutputStream.cpp index 355cea7..c19fff6 100644 --- a/libminifi/src/io/OutputStream.cpp +++ b/libminifi/src/io/OutputStream.cpp @@ -30,43 +30,43 @@ namespace nifi { namespace minifi { namespace io { -int OutputStream::write(const std::vector<uint8_t>& buffer, int len) { - if (buffer.size() < gsl::narrow<size_t>(len)) { - return -1; +size_t OutputStream::write(const std::vector<uint8_t>& buffer, size_t len) { + if (buffer.size() < len) { + return STREAM_ERROR; } return write(buffer.data(), len); } -int OutputStream::write(bool value) { +size_t OutputStream::write(bool value) { uint8_t temp = value; return write(&temp, 1); } -int OutputStream::write(const utils::Identifier &value) { +size_t OutputStream::write(const utils::Identifier &value) { return write(value.to_string()); } -int OutputStream::write(const std::string& str, bool widen) { +size_t OutputStream::write(const std::string& str, bool widen) { return write_str(str.c_str(), gsl::narrow<uint32_t>(str.length()), widen); } -int OutputStream::write(const char* str, bool widen) { +size_t OutputStream::write(const char* str, bool widen) { return write_str(str, gsl::narrow<uint32_t>(std::strlen(str)), widen); } -int OutputStream::write_str(const char* str, uint32_t len, bool widen) { - int ret = 0; +size_t OutputStream::write_str(const char* str, uint32_t len, bool widen) { + size_t ret = 0; if (!widen) { - uint16_t shortLen = len; + const auto shortLen = gsl::narrow_cast<uint16_t>(len); if (len != shortLen) { - return -1; + return STREAM_ERROR; } ret = write(shortLen); } else { ret = write(len); } - if (ret <= 0) { + if (ret == 0 || isError(ret)) { return ret; } diff --git a/libminifi/src/io/ZlibStream.cpp b/libminifi/src/io/ZlibStream.cpp index aaf66b9..711a31d 100644 --- a/libminifi/src/io/ZlibStream.cpp +++ b/libminifi/src/io/ZlibStream.cpp @@ -61,15 +61,14 @@ ZlibCompressStream::~ZlibCompressStream() { } } -int ZlibCompressStream::write(const uint8_t* value, int size) { - gsl_Expects(size >= 0); +size_t ZlibCompressStream::write(const uint8_t* value, size_t size) { if (state_ != ZlibStreamState::INITIALIZED) { logger_->log_error("writeData called in invalid ZlibCompressStream state, state is %hhu", state_); - return -1; + return STREAM_ERROR; } strm_.next_in = const_cast<uint8_t*>(value); - strm_.avail_in = size; + strm_.avail_in = gsl::narrow<uInt>(size); /* * deflate consumes all input data it can (i.e. if it has enough output buffer it never leaves input data unconsumed) @@ -92,14 +91,14 @@ int ZlibCompressStream::write(const uint8_t* value, int size) { if (ret == Z_STREAM_ERROR) { logger_->log_error("deflate failed, error code: %d", ret); state_ = ZlibStreamState::ERRORED; - return -1; + return STREAM_ERROR; } - int output_size = gsl::narrow<int>(outputBuffer_.size() - strm_.avail_out); + const auto output_size = outputBuffer_.size() - strm_.avail_out; logger_->log_trace("deflate produced %d B of output data", output_size); if (output_->write(outputBuffer_.data(), output_size) != output_size) { logger_->log_error("Failed to write to underlying stream"); state_ = ZlibStreamState::ERRORED; - return -1; + return STREAM_ERROR; } } while (strm_.avail_out == 0); @@ -131,15 +130,14 @@ ZlibDecompressStream::~ZlibDecompressStream() { } } -int ZlibDecompressStream::write(const uint8_t* value, int size) { - gsl_Expects(size >= 0); +size_t ZlibDecompressStream::write(const uint8_t* value, size_t size) { if (state_ != ZlibStreamState::INITIALIZED) { logger_->log_error("writeData called in invalid ZlibDecompressStream state, state is %hhu", state_); - return -1; + return STREAM_ERROR; } strm_.next_in = const_cast<uint8_t*>(value); - strm_.avail_in = size; + strm_.avail_in = gsl::narrow<uInt>(size); /* * inflate works similarly to deflate in that it will not leave input data unconsumed, and we have to watch avail_out, @@ -160,14 +158,14 @@ int ZlibDecompressStream::write(const uint8_t* value, int size) { ret == Z_MEM_ERROR) { logger_->log_error("inflate failed, error code: %d", ret); state_ = ZlibStreamState::ERRORED; - return -1; + return STREAM_ERROR; } - int output_size = gsl::narrow<int>(outputBuffer_.size() - strm_.avail_out); + const auto output_size = outputBuffer_.size() - strm_.avail_out; logger_->log_trace("deflate produced %d B of output data", output_size); if (output_->write(outputBuffer_.data(), output_size) != output_size) { logger_->log_error("Failed to write to underlying stream"); state_ = ZlibStreamState::ERRORED; - return -1; + return STREAM_ERROR; } } while (strm_.avail_out == 0); diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp index ea93dea..e7a84fc 100644 --- a/libminifi/src/io/tls/SecureDescriptorStream.cpp +++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp @@ -47,30 +47,24 @@ void SecureDescriptorStream::seek(size_t offset) { // data stream overrides -int SecureDescriptorStream::write(const uint8_t *value, int size) { - gsl_Expects(size >= 0); +size_t SecureDescriptorStream::write(const uint8_t *value, size_t size) { if (size == 0) { return 0; } - if (!IsNullOrEmpty(value)) { - std::lock_guard<std::recursive_mutex> lock(file_lock_); - int bytes = 0; - int sent = 0; - while (bytes < size) { - sent = SSL_write(ssl_, value + bytes, size - bytes); - // check for errors - if (sent < 0) { - int ret = 0; - ret = SSL_get_error(ssl_, sent); - logger_->log_error("WriteData socket %d send failed %s %d", fd_, strerror(errno), ret); - return sent; - } - bytes += sent; - } - return size; - } else { - return -1; + if (IsNullOrEmpty(value)) return STREAM_ERROR; + std::lock_guard<std::recursive_mutex> lock(file_lock_); + size_t bytes = 0; + while (bytes < size) { + const auto write_status = SSL_write(ssl_, value + bytes, gsl::narrow_cast<int>(size - bytes)); + // check for errors + if (write_status < 0) { + const auto ret = SSL_get_error(ssl_, write_status); + logger_->log_error("WriteData socket %d send failed %s %d", fd_, strerror(errno), ret); + return STREAM_ERROR; + } + bytes += gsl::narrow<size_t>(write_status); } + return size; } size_t SecureDescriptorStream::read(uint8_t * const buf, const size_t buflen) { diff --git a/libminifi/src/io/tls/TLSServerSocket.cpp b/libminifi/src/io/tls/TLSServerSocket.cpp index ed9cdc5..ae9d692 100644 --- a/libminifi/src/io/tls/TLSServerSocket.cpp +++ b/libminifi/src/io/tls/TLSServerSocket.cpp @@ -33,6 +33,7 @@ #include <cstdio> #include <memory> #include <chrono> +#include <limits> #include <thread> #include <utility> #include <vector> @@ -44,6 +45,8 @@ #include "core/logging/LoggerConfiguration.h" #include "io/tls/SecureDescriptorStream.h" #include "io/validation.h" +#include "utils/GeneralUtils.h" +#include "utils/gsl.h" namespace org { namespace apache { @@ -87,47 +90,55 @@ void TLSServerSocket::registerCallback(std::function<bool()> accept_function, st * Initializes the socket * @return result of the creation operation. */ -void TLSServerSocket::registerCallback(std::function<bool()> accept_function, std::function<int(std::vector<uint8_t>*, int *)> handler, std::chrono::milliseconds timeout) { - fx = [this](std::function<bool()> accept_function, std::function<int(std::vector<uint8_t>*, int *)> handler, std::chrono::milliseconds timeout) { - int ret = 0; - std::vector<int> fds; - int size; - while (accept_function()) { - int fd = select_descriptor(gsl::narrow<uint16_t>(timeout.count())); - if (fd > 0) { - int fd_remove = 0; - std::vector<uint8_t> data; - if ( handler(&data, &size) > 0 ) { - ret = writeData(data.data(), size, fd); - if (ret < 0) { - close_ssl(fd_remove); - } else { - fds.push_back(fd); - } - } - } else { - int fd_remove = 0; - for (auto &&fd : fds) { +void TLSServerSocket::registerCallback(std::function<bool()> accept_function, std::function<size_t(std::vector<uint8_t>*)> handler, std::chrono::milliseconds timeout) { + struct Fx { + void operator()() const { + std::vector<int> fds; + size_t size; + while (accept_function()) { + int fd = server_socket_->select_descriptor(gsl::narrow<uint16_t>(timeout.count())); + if (fd > 0) { + int fd_remove = 0; std::vector<uint8_t> data; - if ( handler(&data, &size) > 0 ) { - ret = writeData(data.data(), size, fd); - if (ret < 0) { - fd_remove = fd; - break; + size = handler(&data); + if (size > 0 && !io::isError(size)) { + const auto ret = server_socket_->writeData(data.data(), size, fd); + if (io::isError(ret)) { + server_socket_->close_ssl(fd_remove); + } else { + fds.push_back(fd); } } - } - if (fd_remove > 0) { - close_ssl(fd_remove); - fds.erase(std::remove(fds.begin(), fds.end(), fd_remove), fds.end()); + } else { + int fd_remove = 0; + for (auto &&fd : fds) { + std::vector<uint8_t> data; + size = handler(&data); + if (size > 0 && !io::isError(size)) { + const auto ret = server_socket_->writeData(data.data(), size, fd); + if (io::isError(ret)) { + fd_remove = fd; + break; + } + } + } + if (fd_remove > 0) { + server_socket_->close_ssl(fd_remove); + fds.erase(std::remove(fds.begin(), fds.end(), fd_remove), fds.end()); + } } } + for (auto &&fd : fds) { + server_socket_->close_ssl(fd); + } } - for (auto &&fd : fds) { - close_ssl(fd); - } + + TLSServerSocket* server_socket_; + std::function<bool()> accept_function; + std::function<size_t(std::vector<uint8_t>*)> handler; + std::chrono::milliseconds timeout; }; - server_read_thread_ = std::thread(fx, accept_function, handler, timeout); + server_read_thread_ = std::thread(Fx{this, std::move(accept_function), std::move(handler), timeout}); } void TLSServerSocket::close_fd(int fd) { diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp index 93422fa..1cb6268 100644 --- a/libminifi/src/io/tls/TLSSocket.cpp +++ b/libminifi/src/io/tls/TLSSocket.cpp @@ -387,34 +387,32 @@ size_t TLSSocket::read(uint8_t *buf, size_t buflen, bool /*retrieve_all_bytes*/) return total_read; } -int TLSSocket::writeData(const uint8_t *value, unsigned int size, int fd) { - unsigned int bytes = 0; - int sent = 0; +size_t TLSSocket::writeData(const uint8_t *value, size_t size, int fd) { + size_t bytes = 0; auto fd_ssl = get_ssl(fd); if (IsNullOrEmpty(fd_ssl)) { - return -1; + return STREAM_ERROR; } while (bytes < size) { - sent = SSL_write(fd_ssl, value + bytes, gsl::narrow<int>(size - bytes)); + const auto sent = SSL_write(fd_ssl, value + bytes, gsl::narrow<int>(size - bytes)); // check for errors if (sent < 0) { int ret = 0; ret = SSL_get_error(fd_ssl, sent); logger_->log_trace("WriteData socket %d send failed %s %d", fd, strerror(errno), ret); - - return sent; + return STREAM_ERROR; } logger_->log_trace("WriteData socket %d send succeed %d", fd, sent); - bytes += sent; + bytes += gsl::narrow<size_t>(sent); } - return gsl::narrow<int>(size); + return size; } -int TLSSocket::write(const uint8_t *value, int size) { - int fd = select_descriptor(1000); +size_t TLSSocket::write(const uint8_t *value, size_t size) { + const int fd = select_descriptor(1000); if (fd < 0) { close(); - return -1; + return STREAM_ERROR; } return writeData(value, size, fd); } diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index f3b0715..c3f6caf 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -88,134 +88,158 @@ bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Serializable } bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStream& outStream) { - int ret; - - ret = outStream.write(this->uuid_); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(this->uuid_); + if (ret == 0 || io::isError(ret)) { + return false; + } } - - uint32_t eventType = this->_eventType; - ret = outStream.write(eventType); - if (ret != 4) { - return false; + { + uint32_t eventType = this->_eventType; + const auto ret = outStream.write(eventType); + if (ret != 4) { + return false; + } } - - ret = outStream.write(this->_eventTime); - if (ret != 8) { - return false; + { + const auto ret = outStream.write(this->_eventTime); + if (ret != 8) { + return false; + } } - - ret = outStream.write(this->_entryDate); - if (ret != 8) { - return false; + { + const auto ret = outStream.write(this->_entryDate); + if (ret != 8) { + return false; + } } - - ret = outStream.write(this->_eventDuration); - if (ret != 8) { - return false; + { + const auto ret = outStream.write(this->_eventDuration); + if (ret != 8) { + return false; + } } - - ret = outStream.write(this->_lineageStartDate); - if (ret != 8) { - return false; + { + const auto ret = outStream.write(this->_lineageStartDate); + if (ret != 8) { + return false; + } } - - ret = outStream.write(this->_componentId); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(this->_componentId); + if (ret == 0 || io::isError(ret)) { + return false; + } } - - ret = outStream.write(this->_componentType); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(this->_componentType); + if (ret == 0 || io::isError(ret)) { + return false; + } } - - ret = outStream.write(this->flow_uuid_); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(this->flow_uuid_); + if (ret == 0 || io::isError(ret)) { + return false; + } } - - ret = outStream.write(this->_details); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(this->_details); + if (ret == 0 || io::isError(ret)) { + return false; + } } - // write flow attributes - uint32_t numAttributes = gsl::narrow<uint32_t>(this->_attributes.size()); - ret = outStream.write(numAttributes); - if (ret != 4) { - return false; + { + const auto numAttributes = gsl::narrow<uint32_t>(this->_attributes.size()); + const auto ret = outStream.write(numAttributes); + if (ret != 4) { + return false; + } } - for (const auto& itAttribute : _attributes) { - ret = outStream.write(itAttribute.first); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(itAttribute.first); + if (ret == 0 || io::isError(ret)) { + return false; + } } - ret = outStream.write(itAttribute.second); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(itAttribute.second); + if (ret == 0 || io::isError(ret)) { + return false; + } } } - - ret = outStream.write(this->_contentFullPath); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(this->_contentFullPath); + if (ret == 0 || io::isError(ret)) { + return false; + } } - - ret = outStream.write(this->_size); - if (ret != 8) { - return false; + { + const auto ret = outStream.write(this->_size); + if (ret != 8) { + return false; + } } - - ret = outStream.write(this->_offset); - if (ret != 8) { - return false; + { + const auto ret = outStream.write(this->_offset); + if (ret != 8) { + return false; + } } - - ret = outStream.write(this->_sourceQueueIdentifier); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(this->_sourceQueueIdentifier); + if (ret == 0 || io::isError(ret)) { + return false; + } } - if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) { // write UUIDs - uint32_t parent_uuids_count = gsl::narrow<uint32_t>(this->_parentUuids.size()); - ret = outStream.write(parent_uuids_count); - if (ret != 4) { - return false; + { + const auto parent_uuids_count = gsl::narrow<uint32_t>(this->_parentUuids.size()); + const auto ret = outStream.write(parent_uuids_count); + if (ret != 4) { + return false; + } } for (const auto& parentUUID : _parentUuids) { - ret = outStream.write(parentUUID); - if (ret <= 0) { + const auto ret = outStream.write(parentUUID); + if (ret == 0 || io::isError(ret)) { return false; } } - uint32_t children_uuids_count = gsl::narrow<uint32_t>(this->_childrenUuids.size()); - ret = outStream.write(children_uuids_count); - if (ret != 4) { - return false; + { + const auto children_uuids_count = gsl::narrow<uint32_t>(this->_childrenUuids.size()); + const auto ret = outStream.write(children_uuids_count); + if (ret != 4) { + return false; + } } for (const auto& childUUID : _childrenUuids) { - ret = outStream.write(childUUID); - if (ret <= 0) { + const auto ret = outStream.write(childUUID); + if (ret == 0 || io::isError(ret)) { return false; } } } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) { - ret = outStream.write(this->_transitUri); - if (ret <= 0) { + const auto ret = outStream.write(this->_transitUri); + if (ret == 0 || io::isError(ret)) { return false; } } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { - ret = outStream.write(this->_transitUri); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(this->_transitUri); + if (ret == 0 || io::isError(ret)) { + return false; + } } - ret = outStream.write(this->_sourceSystemFlowFileIdentifier); - if (ret <= 0) { - return false; + { + const auto ret = outStream.write(this->_sourceSystemFlowFileIdentifier); + if (ret == 0 || io::isError(ret)) { + return false; + } } } diff --git a/libminifi/src/serialization/FlowFileV3Serializer.cpp b/libminifi/src/serialization/FlowFileV3Serializer.cpp index 1fa30a7..2407f87 100644 --- a/libminifi/src/serialization/FlowFileV3Serializer.cpp +++ b/libminifi/src/serialization/FlowFileV3Serializer.cpp @@ -27,85 +27,78 @@ namespace minifi { constexpr uint8_t FlowFileV3Serializer::MAGIC_HEADER[]; -int FlowFileV3Serializer::writeLength(std::size_t length, const std::shared_ptr<io::OutputStream>& out) { +size_t FlowFileV3Serializer::writeLength(std::size_t length, const std::shared_ptr<io::OutputStream>& out) { if (length < MAX_2_BYTE_VALUE) { return out->write(static_cast<uint16_t>(length)); } - int sum = 0; - int ret; - ret = out->write(static_cast<uint16_t>(MAX_2_BYTE_VALUE)); - if (ret < 0) { - return ret; + size_t sum = 0; + { + const auto ret = out->write(static_cast<uint16_t>(MAX_2_BYTE_VALUE)); + if (io::isError(ret)) return ret; + sum += ret; } - sum += ret; - ret = out->write(static_cast<uint32_t>(length)); - if (ret < 0) { - return ret; + { + const auto ret = out->write(static_cast<uint32_t>(length)); + if (io::isError(ret)) return ret; + sum += ret; } - sum += ret; return sum; } -int FlowFileV3Serializer::writeString(const std::string &str, const std::shared_ptr<io::OutputStream> &out) { - int sum = 0; - int ret; - ret = writeLength(str.length(), out); - if (ret < 0) { - return ret; - } - sum += ret; - ret = out->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data())), gsl::narrow<int>(str.length())); - if (ret < 0) { - return ret; +size_t FlowFileV3Serializer::writeString(const std::string &str, const std::shared_ptr<io::OutputStream> &out) { + size_t sum = 0; + { + const auto ret = writeLength(str.length(), out); + if (io::isError(ret)) return ret; + sum += ret; } - if (gsl::narrow<size_t>(ret) != str.length()) { - return -1; + { + const auto ret = out->write(reinterpret_cast<const uint8_t*>(str.data()), str.length()); + if (io::isError(ret)) return ret; + if (ret != str.length()) return io::STREAM_ERROR; + sum += ret; } - sum += ret; return sum; } int FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) { - int sum = 0; - int ret; - ret = out->write(const_cast<uint8_t*>(MAGIC_HEADER), sizeof(MAGIC_HEADER)); - if (ret < 0) { - return ret; - } - if (ret != sizeof(MAGIC_HEADER)) { - return -1; + size_t sum = 0; + { + const auto ret = out->write(MAGIC_HEADER, sizeof(MAGIC_HEADER)); + if (io::isError(ret)) return -1; + if (ret != sizeof(MAGIC_HEADER)) return -1; + sum += ret; } - sum += ret; const auto& attributes = flowFile->getAttributes(); - ret = writeLength(attributes.size(), out); - if (ret < 0) { - return ret; + { + const auto ret = writeLength(attributes.size(), out); + if (io::isError(ret)) return -1; + sum += ret; } - sum += ret; for (const auto& attrIt : attributes) { - ret = writeString(attrIt.first, out); - if (ret < 0) { - return ret; + { + const auto ret = writeString(attrIt.first, out); + if (io::isError(ret)) return -1; + sum += ret; } - sum += ret; - ret = writeString(attrIt.second, out); - if (ret < 0) { - return ret; + { + const auto ret = writeString(attrIt.second, out); + if (io::isError(ret)) return -1; + sum += ret; } - sum += ret; } - ret = out->write(static_cast<uint64_t>(flowFile->getSize())); - if (ret < 0) { - return ret; + { + const auto ret = out->write(static_cast<uint64_t>(flowFile->getSize())); + if (io::isError(ret)) return -1; + sum += ret; } - sum += ret; InputStreamPipe pipe(out); - ret = reader_(flowFile, &pipe); - if (ret < 0) { - return ret; + { + const auto ret = reader_(flowFile, &pipe); + if (ret < 0) return -1; + sum += gsl::narrow<size_t>(ret); } - sum += ret; - return sum; + return gsl::narrow<int>(sum); } } /* namespace minifi */ diff --git a/libminifi/src/sitetosite/Peer.cpp b/libminifi/src/sitetosite/Peer.cpp index 3ed6b70..5a5be8a 100644 --- a/libminifi/src/sitetosite/Peer.cpp +++ b/libminifi/src/sitetosite/Peer.cpp @@ -45,7 +45,7 @@ bool SiteToSitePeer::Open() { * previously by the socket preference. */ if (!this->local_network_interface_.getInterface().empty()) { - auto socket = static_cast<io::Socket*>(stream_.get()); + auto* socket = dynamic_cast<io::Socket*>(stream_.get()); if (nullptr != socket) { socket->setInterface(io::NetworkInterface(local_network_interface_.getInterface(), nullptr)); } @@ -54,9 +54,8 @@ bool SiteToSitePeer::Open() { if (stream_->initialize() < 0) return false; - int data_size = gsl::narrow<int>(sizeof MAGIC_BYTES); - - if (stream_->write(reinterpret_cast<uint8_t *>(const_cast<char*>(MAGIC_BYTES)), data_size) != data_size) { + const auto data_size = sizeof MAGIC_BYTES; + if (stream_->write(reinterpret_cast<const uint8_t *>(MAGIC_BYTES), data_size) != data_size) { return false; } diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp index b60b2f4..f02f785 100644 --- a/libminifi/src/sitetosite/RawSocketProtocol.cpp +++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp @@ -117,7 +117,7 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() { { const auto ret = peer_->write(getResourceName()); logger_->log_trace("result of writing resource name is %i", ret); - if (ret <= 0) { + if (ret == 0 || io::isError(ret)) { logger_->log_debug("result of writing resource name is %i", ret); // tearDown(); return false; @@ -126,7 +126,7 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() { { const auto ret = peer_->write(_currentVersion); - if (ret <= 0) { + if (ret == 0 || io::isError(ret)) { logger_->log_debug("result of writing version is %i", ret); return false; } @@ -185,7 +185,7 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() { { const auto ret = peer_->write(getCodecResourceName()); - if (ret <= 0) { + if (ret == 0 || io::isError(ret)) { logger_->log_debug("result of getCodecResourceName is %i", ret); return false; } @@ -193,7 +193,7 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() { { const auto ret = peer_->write(_currentCodecVersion); - if (ret <= 0) { + if (ret == 0 || io::isError(ret)) { logger_->log_debug("result of _currentCodecVersion is %i", ret); return false; } @@ -248,7 +248,7 @@ bool RawSiteToSiteClient::handShake() { { const auto ret = peer_->write(_commsIdentifier); - if (ret <= 0) { + if (ret == 0 || io::isError(ret)) { return false; } } @@ -268,7 +268,7 @@ bool RawSiteToSiteClient::handShake() { if (_currentVersion >= 3) { const auto ret = peer_->write(peer_->getURL()); - if (ret <= 0) { + if (ret == 0 || io::isError(ret)) { return false; } } @@ -276,7 +276,7 @@ bool RawSiteToSiteClient::handShake() { { const auto size = gsl::narrow<uint32_t>(properties.size()); const auto ret = peer_->write(size); - if (ret <= 0) { + if (ret == 0 || io::isError(ret)) { return false; } } @@ -285,13 +285,13 @@ bool RawSiteToSiteClient::handShake() { for (it = properties.begin(); it != properties.end(); it++) { { const auto ret = peer_->write(it->first); - if (ret <= 0) { + if (ret == 0 || io::isError(ret)) { return false; } } { const auto ret = peer_->write(it->second); - if (ret <= 0) { + if (ret == 0 || io::isError(ret)) { return false; } } @@ -412,7 +412,8 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) { if (type >= MAX_REQUEST_TYPE) return -1; - return peer_->write(SiteToSiteRequest::RequestTypeStr[type]); + const auto write_result = peer_->write(SiteToSiteRequest::RequestTypeStr[type]); + return io::isError(write_result) ? -1 : gsl::narrow<int>(write_result); } int RawSiteToSiteClient::readRequestType(RequestType &type) { diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp index 60cef54..f12a96d 100644 --- a/libminifi/src/sitetosite/SiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp @@ -80,29 +80,22 @@ void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID) int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode code, std::string message) { RespondCodeContext *resCode = this->getRespondCodeContext(code); - if (!resCode) { - // Not a valid respond code return -1; } - uint8_t codeSeq[3]; - codeSeq[0] = CODE_SEQUENCE_VALUE_1; - codeSeq[1] = CODE_SEQUENCE_VALUE_2; - codeSeq[2] = (uint8_t) code; - - int ret = peer_->write(codeSeq, 3); - - if (ret != 3) - return -1; + { + const uint8_t codeSeq[3] { CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, static_cast<uint8_t>(code) }; + const auto ret = peer_->write(codeSeq, 3); + if (ret != 3) + return -1; + } if (resCode->hasDescription) { - ret = peer_->write(message); - if (ret > 0) { - return (3 + ret); - } else { - return ret; - } + const auto ret = peer_->write(message); + if (io::isError(ret)) return -1; + if (ret == 0) return 0; + return 3 + gsl::narrow<int>(ret); } else { return 3; } @@ -407,8 +400,6 @@ bool SiteToSiteClient::complete(const utils::Identifier& transactionID) { } int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacket *packet, const std::shared_ptr<core::FlowFile> &flowFile, const std::shared_ptr<core::ProcessSession> &session) { - int ret; - if (peer_state_ != READY) { bootstrap(); } @@ -418,7 +409,6 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke } auto it = this->known_transactions_.find(transactionID); - if (it == known_transactions_.end()) { return -1; } @@ -435,30 +425,34 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke } if (transaction->current_transfers_ > 0) { - ret = writeResponse(transaction, CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION"); + const auto ret = writeResponse(transaction, CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION"); if (ret <= 0) { return -1; } } // start to read the packet - uint32_t numAttributes = gsl::narrow<uint32_t>(packet->_attributes.size()); - ret = transaction->getStream().write(numAttributes); - if (ret != 4) { - return -1; + { + const auto numAttributes = gsl::narrow<uint32_t>(packet->_attributes.size()); + const auto ret = transaction->getStream().write(numAttributes); + if (ret != 4) { + return -1; + } } - std::map<std::string, std::string>::iterator itAttribute; - for (itAttribute = packet->_attributes.begin(); itAttribute != packet->_attributes.end(); itAttribute++) { - ret = transaction->getStream().write(itAttribute->first, true); - - if (ret <= 0) { - return -1; + for (const auto& attribute : packet->_attributes) { + { + const auto ret = transaction->getStream().write(attribute.first, true); + if (ret == 0 || io::isError(ret)) { + return -1; + } } - ret = transaction->getStream().write(itAttribute->second, true); - if (ret <= 0) { - return -1; + { + const auto ret = transaction->getStream().write(attribute.second, true); + if (ret == 0 || io::isError(ret)) { + return -1; + } } - logger_->log_debug("Site2Site transaction %s send attribute key %s value %s", transactionID.to_string(), itAttribute->first, itAttribute->second); + logger_->log_debug("Site2Site transaction %s send attribute key %s value %s", transactionID.to_string(), attribute.first, attribute.second); } bool flowfile_has_content = (flowFile != nullptr); @@ -472,7 +466,7 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke uint64_t len = 0; if (flowFile && flowfile_has_content) { len = flowFile->getSize(); - ret = transaction->getStream().write(len); + const auto ret = transaction->getStream().write(len); if (ret != 8) { logger_->log_debug("Failed to write content size!"); return -1; @@ -493,20 +487,22 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke } } else if (packet->payload_.length() > 0) { len = packet->payload_.length(); - - ret = transaction->getStream().write(len); - if (ret != 8) { - return -1; + { + const auto ret = transaction->getStream().write(len); + if (ret != 8) { + return -1; + } } - - ret = transaction->getStream().write(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())), gsl::narrow<int>(len)); - if (ret != gsl::narrow<int64_t>(len)) { - logger_->log_debug("Failed to write payload size!"); - return -1; + { + const auto ret = transaction->getStream().write(reinterpret_cast<const uint8_t*>(packet->payload_.c_str()), gsl::narrow<size_t>(len)); + if (ret != gsl::narrow<size_t>(len)) { + logger_->log_debug("Failed to write payload size!"); + return -1; + } } packet->_size += len; } else if (flowFile && !flowfile_has_content) { - ret = transaction->getStream().write(len); // Indicate zero length + const auto ret = transaction->getStream().write(len); // Indicate zero length if (ret != 8) { logger_->log_debug("Failed to write content size (0)!"); return -1; diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp index a64c3ac..c64e779 100644 --- a/libminifi/src/utils/ByteArrayCallback.cpp +++ b/libminifi/src/utils/ByteArrayCallback.cpp @@ -45,7 +45,7 @@ int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& str stream->seek(0); std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]); auto written = readFully(buffer.get(), size_); - stream->write(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(written)); + stream->write(reinterpret_cast<uint8_t*>(buffer.get()), written); return gsl::narrow<int64_t>(stream->size()); } diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp index 4ad473d..a0408ab 100644 --- a/libminifi/test/archive-tests/MergeFileTests.cpp +++ b/libminifi/test/archive-tests/MergeFileTests.cpp @@ -88,7 +88,7 @@ class FixedBuffer : public minifi::InputStreamCallback { REQUIRE(size_ + len <= capacity_); int total_read = 0; do { - const auto ret = input.read(end(), len); + const size_t ret{ input.read(end(), len) }; if (ret == 0) break; if (minifi::io::isError(ret)) return -1; size_ += ret; @@ -679,7 +679,7 @@ TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping All Un } void writeString(const std::string& str, const std::shared_ptr<minifi::io::BaseStream>& out) { - out->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data())), gsl::narrow<int>(str.length())); + out->write(reinterpret_cast<const uint8_t*>(str.data()), str.length()); } TEST_CASE("FlowFile serialization", "[testFlowFileSerialization]") { diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp index 1bf7b0c..88aa006 100644 --- a/libminifi/test/persistence-tests/PersistenceTests.cpp +++ b/libminifi/test/persistence-tests/PersistenceTests.cpp @@ -96,7 +96,7 @@ struct TestFlow{ processor->onSchedule(processorContext.get(), new core::ProcessSessionFactory(processorContext)); } std::shared_ptr<core::FlowFile> write(const std::string& data) { - minifi::io::BufferStream stream(reinterpret_cast<const uint8_t*>(data.c_str()), gsl::narrow<int>(data.length())); + minifi::io::BufferStream stream(reinterpret_cast<const uint8_t*>(data.c_str()), data.length()); core::ProcessSession sessionGenFlowFile(inputContext); std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create()); sessionGenFlowFile.importFrom(stream, flow); diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp index f63947b..50586ec 100644 --- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp +++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp @@ -47,8 +47,7 @@ class ContentSessionController : public TestController { }; const std::shared_ptr<minifi::io::BaseStream>& operator<<(const std::shared_ptr<minifi::io::BaseStream>& stream, const std::string& str) { - int length = gsl::narrow<int>(str.length()); - REQUIRE(stream->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data())), length) == length); + REQUIRE(stream->write(reinterpret_cast<const uint8_t*>(str.data()), str.length()) == str.length()); return stream; } @@ -57,7 +56,7 @@ const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr< uint8_t buffer[4096]{}; while (true) { const auto ret = stream->read(buffer, sizeof(buffer)); - REQUIRE(!minifi::io::isError(ret)); + REQUIRE_FALSE(minifi::io::isError(ret)); if (ret == 0) { break; } str += std::string{reinterpret_cast<char*>(buffer), ret}; } diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp index 712d9f8..68f3d6c 100644 --- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp +++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp @@ -65,7 +65,7 @@ TEST_CASE("Write Claim", "[TestDBCR1]") { // should not be able to write to the read stream // -1 will indicate that we were not able to write any data - REQUIRE(read_stream->write("other value") == -1); + REQUIRE(minifi::io::isError(read_stream->write("other value"))); } TEST_CASE("Delete Claim", "[TestDBCR2]") { diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp index 69336b8..d94dcce 100644 --- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp +++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp @@ -45,7 +45,9 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Verify simple operation") { std::string content = "banana"; minifi::io::RocksDbStream outStream("one", gsl::make_not_null(db.get()), true); outStream.write(content); - REQUIRE(outStream.write(content) > 0); + const auto second_write_result = outStream.write(content); + REQUIRE(second_write_result > 0); + REQUIRE_FALSE(minifi::io::isError(second_write_result)); minifi::io::RocksDbStream inStream("one", gsl::make_not_null(db.get())); std::string str; inStream.read(str); @@ -59,12 +61,14 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Write zero bytes") { minifi::io::RocksDbStream readonlyStream("two", gsl::make_not_null(db.get()), false); - REQUIRE(readonlyStream.write(nullptr, 0) == -1); + REQUIRE(minifi::io::isError(readonlyStream.write(nullptr, 0))); } TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") { minifi::io::RocksDbStream one("one", gsl::make_not_null(db.get()), true); - REQUIRE(one.write("banana") > 0); + const auto banana_write_result = one.write("banana"); + REQUIRE_FALSE(minifi::io::isError(banana_write_result)); + REQUIRE(banana_write_result > 0); minifi::io::RocksDbStream stream("one", gsl::make_not_null(db.get())); diff --git a/libminifi/test/sql-tests/SQLTestPlan.h b/libminifi/test/sql-tests/SQLTestPlan.h index f924d87..c69d847 100644 --- a/libminifi/test/sql-tests/SQLTestPlan.h +++ b/libminifi/test/sql-tests/SQLTestPlan.h @@ -55,7 +55,7 @@ class SQLTestPlan { if (content) { auto claim = std::make_shared<minifi::ResourceClaim>(plan_->getContentRepo()); auto content_stream = plan_->getContentRepo()->write(*claim); - int ret = content_stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(content->c_str())), content->length()); + const auto ret = content_stream->write(reinterpret_cast<const uint8_t*>(content->c_str()), content->length()); REQUIRE(ret == content->length()); flow_file->setOffset(0); flow_file->setSize(content->length()); diff --git a/libminifi/test/unit/CRCTests.cpp b/libminifi/test/unit/CRCTests.cpp index 17c62da..57da97a 100644 --- a/libminifi/test/unit/CRCTests.cpp +++ b/libminifi/test/unit/CRCTests.cpp @@ -26,7 +26,7 @@ TEST_CASE("Test CRC1", "[testcrc1]") { org::apache::nifi::minifi::io::BufferStream base; org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test(gsl::make_not_null(&base)); - test.write(reinterpret_cast<uint8_t*>(const_cast<char*>("cow")), 3); + test.write(reinterpret_cast<const uint8_t*>("cow"), 3); REQUIRE(2580823964 == test.getCRC()); } @@ -35,7 +35,7 @@ TEST_CASE("Test CRC2", "[testcrc2]") { org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test(gsl::make_not_null(&base)); std::string fox = "the quick brown fox jumped over the brown fox"; std::vector<uint8_t> charvect(fox.begin(), fox.end()); - test.write(charvect, gsl::narrow<int>(charvect.size())); + test.write(charvect, charvect.size()); REQUIRE(1922388889 == test.getCRC()); } @@ -74,8 +74,8 @@ TEST_CASE("CRCStream with initial crc = 0 is the same as without initial crc", " std::vector<uint8_t> textVector1(textString.begin(), textString.end()); std::vector<uint8_t> textVector2(textString.begin(), textString.end()); - test_noinit.write(textVector1, gsl::narrow<int>(textVector1.size())); - test_initzero.write(textVector2, gsl::narrow<int>(textVector2.size())); + test_noinit.write(textVector1, textVector1.size()); + test_initzero.write(textVector2, textVector2.size()); REQUIRE(test_noinit.getCRC() == test_initzero.getCRC()); } @@ -85,17 +85,17 @@ TEST_CASE("CRCStream: one long write is the same as writing in two pieces", "[in org::apache::nifi::minifi::io::BufferStream base_full; org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_full(gsl::make_not_null(&base_full), 0); std::vector<uint8_t> textVector_full(textString.begin(), textString.end()); - test_full.write(textVector_full, gsl::narrow<int>(textVector_full.size())); + test_full.write(textVector_full, textVector_full.size()); org::apache::nifi::minifi::io::BufferStream base_piece1; org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_piece1(gsl::make_not_null(&base_piece1), 0); std::vector<uint8_t> textVector_piece1(textString.begin(), textString.begin() + 15); - test_piece1.write(textVector_piece1, gsl::narrow<int>(textVector_piece1.size())); + test_piece1.write(textVector_piece1, textVector_piece1.size()); org::apache::nifi::minifi::io::BufferStream base_piece2; org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_piece2(gsl::make_not_null(&base_piece2), test_piece1.getCRC()); std::vector<uint8_t> textVector_piece2(textString.begin() + 15, textString.end()); - test_piece2.write(textVector_piece2, gsl::narrow<int>(textVector_piece2.size())); + test_piece2.write(textVector_piece2, textVector_piece2.size()); REQUIRE(test_full.getCRC() == test_piece2.getCRC()); } diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp index 57bae57..ed6f10d 100644 --- a/libminifi/test/unit/FileStreamTests.cpp +++ b/libminifi/test/unit/FileStreamTests.cpp @@ -53,7 +53,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") { stream.seek(4); - stream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("file")), 4); + stream.write(reinterpret_cast<const uint8_t*>("file"), 4); stream.seek(0); @@ -91,7 +91,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") { stream.seek(4); - stream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("file")), 0); + stream.write(reinterpret_cast<const uint8_t*>("file"), 0); stream.seek(0); @@ -276,7 +276,7 @@ TEST_CASE("Non-existing file read/write test") { minifi::io::FileStream stream(utils::file::concat_path(dir, "non_existing_file.txt"), 0, true); REQUIRE(test_controller.getLog().getInstance().contains("Error opening file", std::chrono::seconds(0))); REQUIRE(test_controller.getLog().getInstance().contains("No such file or directory", std::chrono::seconds(0))); - REQUIRE(stream.write("lorem ipsum", false) == -1); + REQUIRE(minifi::io::isError(stream.write("lorem ipsum", false))); REQUIRE(test_controller.getLog().getInstance().contains("Error writing to file: invalid file stream", std::chrono::seconds(0))); std::vector<uint8_t> readBuffer; stream.seek(0); @@ -296,7 +296,7 @@ TEST_CASE("Existing file read/write test") { } minifi::io::FileStream stream(path_to_existing_file, 0, true); REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error opening file", std::chrono::seconds(0))); - REQUIRE_FALSE(stream.write("dolor sit amet", false) == -1); + REQUIRE_FALSE(minifi::io::isError(stream.write("dolor sit amet", false))); REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error writing to file", std::chrono::seconds(0))); std::vector<uint8_t> readBuffer; stream.seek(0); @@ -341,6 +341,6 @@ TEST_CASE("Readonly filestream write test") { outfile.close(); } minifi::io::FileStream stream(path_to_file, 0, false); - REQUIRE(stream.write("dolor sit amet", false) == -1); + REQUIRE(minifi::io::isError(stream.write("dolor sit amet", false))); REQUIRE(test_controller.getLog().getInstance().contains("Error writing to file: write call on file stream failed", std::chrono::seconds(0))); } diff --git a/libminifi/test/unit/FlowFileSerializationTests.cpp b/libminifi/test/unit/FlowFileSerializationTests.cpp index fef1327..5b7254b 100644 --- a/libminifi/test/unit/FlowFileSerializationTests.cpp +++ b/libminifi/test/unit/FlowFileSerializationTests.cpp @@ -35,7 +35,7 @@ std::shared_ptr<minifi::FlowFileRecord> createEmptyFlowFile() { TEST_CASE("Payload Serializer", "[testPayload]") { std::string content = "flowFileContent"; auto contentStream = std::make_shared<minifi::io::BufferStream>(); - contentStream->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(content.data())), gsl::narrow<int>(content.length())); + contentStream->write(reinterpret_cast<const uint8_t*>(content.data()), content.length()); auto result = std::make_shared<minifi::io::BufferStream>(); @@ -57,7 +57,7 @@ TEST_CASE("Payload Serializer", "[testPayload]") { TEST_CASE("FFv3 Serializer", "[testFFv3]") { std::string content = "flowFileContent"; auto contentStream = std::make_shared<minifi::io::BufferStream>(); - contentStream->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(content.data())), gsl::narrow<int>(content.length())); + contentStream->write(reinterpret_cast<const uint8_t*>(content.data()), content.length()); auto result = std::make_shared<minifi::io::BufferStream>(); diff --git a/libminifi/test/unit/SiteToSiteHelper.h b/libminifi/test/unit/SiteToSiteHelper.h index 6e913b9..05c7938 100644 --- a/libminifi/test/unit/SiteToSiteHelper.h +++ b/libminifi/test/unit/SiteToSiteHelper.h @@ -40,10 +40,10 @@ class SiteToSiteResponder : public minifi::io::BaseStream { } void push_response(const std::string& resp) { - server_responses_.write(reinterpret_cast<const uint8_t*>(resp.data()), gsl::narrow<int>(resp.length())); + server_responses_.write(reinterpret_cast<const uint8_t*>(resp.data()), resp.length()); } - int write(const uint8_t *value, int size) override { + size_t write(const uint8_t *value, size_t size) override { client_responses_.push(std::string(reinterpret_cast<const char*>(value), size)); return size; } diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp index f4f3227..85970c9 100644 --- a/libminifi/test/unit/SocketTests.cpp +++ b/libminifi/test/unit/SocketTests.cpp @@ -27,17 +27,20 @@ #include "io/StreamFactory.h" #include "io/Sockets.h" #include "utils/ThreadPool.h" -using Sockets = org::apache::nifi::minifi::io::Socket; + +namespace minifi = org::apache::nifi::minifi; +namespace io = minifi::io; +using io::Socket; TEST_CASE("TestSocket", "[TestSocket1]") { - org::apache::nifi::minifi::io::Socket socket(std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()), Sockets::getMyHostName(), 8183); + Socket socket(std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>()), Socket::getMyHostName(), 8183); REQUIRE(-1 == socket.initialize()); - REQUIRE(socket.getHostname().rfind(Sockets::getMyHostName(), 0) == 0); + REQUIRE(socket.getHostname().rfind(Socket::getMyHostName(), 0) == 0); socket.close(); } TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") { - org::apache::nifi::minifi::io::Socket socket(std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()), Sockets::getMyHostName(), 8183); + Socket socket(std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>()), Socket::getMyHostName(), 8183); REQUIRE(-1 == socket.initialize()); socket.write((const uint8_t*)nullptr, 0); @@ -45,7 +48,7 @@ TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") { std::vector<uint8_t> buffer; buffer.push_back('a'); - REQUIRE(-1 == socket.write(buffer, 1)); + REQUIRE(io::isError(socket.write(buffer, 1))); socket.close(); } @@ -53,12 +56,12 @@ TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") { TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") { std::vector<uint8_t> buffer; buffer.push_back('a'); - std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()); - org::apache::nifi::minifi::io::ServerSocket server(socket_context, Sockets::getMyHostName(), 9183, 1); + std::shared_ptr<io::SocketContext> socket_context = std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>()); + io::ServerSocket server(socket_context, Socket::getMyHostName(), 9183, 1); REQUIRE(-1 != server.initialize()); - org::apache::nifi::minifi::io::Socket client(socket_context, Sockets::getMyHostName(), 9183); + Socket client(socket_context, Socket::getMyHostName(), 9183); REQUIRE(-1 != client.initialize()); @@ -77,19 +80,19 @@ TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") { } TEST_CASE("TestGetHostName", "[TestSocket4]") { - REQUIRE(Sockets::getMyHostName().length() > 0); + REQUIRE(Socket::getMyHostName().length() > 0); } TEST_CASE("TestWriteEndian64", "[TestSocket5]") { std::vector<uint8_t> buffer; buffer.push_back('a'); - std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()); + std::shared_ptr<io::SocketContext> socket_context = std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>()); - org::apache::nifi::minifi::io::ServerSocket server(socket_context, Sockets::getMyHostName(), 9183, 1); + io::ServerSocket server(socket_context, Socket::getMyHostName(), 9183, 1); REQUIRE(-1 != server.initialize()); - org::apache::nifi::minifi::io::Socket client(socket_context, Sockets::getMyHostName(), 9183); + Socket client(socket_context, Socket::getMyHostName(), 9183); REQUIRE(-1 != client.initialize()); @@ -110,12 +113,12 @@ TEST_CASE("TestWriteEndian32", "[TestSocket6]") { std::vector<uint8_t> buffer; buffer.push_back('a'); - std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()); + std::shared_ptr<io::SocketContext> socket_context = std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>()); - org::apache::nifi::minifi::io::ServerSocket server(socket_context, Sockets::getMyHostName(), 9183, 1); + io::ServerSocket server(socket_context, Socket::getMyHostName(), 9183, 1); REQUIRE(-1 != server.initialize()); - org::apache::nifi::minifi::io::Socket client(socket_context, Sockets::getMyHostName(), 9183); + Socket client(socket_context, Socket::getMyHostName(), 9183); REQUIRE(-1 != client.initialize()); { @@ -145,13 +148,13 @@ TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket7]") { std::vector<uint8_t> buffer; buffer.push_back('a'); - std::shared_ptr<org::apache::nifi::minifi::io::SocketContext> socket_context = std::make_shared<org::apache::nifi::minifi::io::SocketContext>(std::make_shared<minifi::Configure>()); + std::shared_ptr<io::SocketContext> socket_context = std::make_shared<io::SocketContext>(std::make_shared<minifi::Configure>()); - org::apache::nifi::minifi::io::ServerSocket server(socket_context, Sockets::getMyHostName(), 9183, 1); + io::ServerSocket server(socket_context, Socket::getMyHostName(), 9183, 1); REQUIRE(-1 != server.initialize()); - org::apache::nifi::minifi::io::Socket client(socket_context, Sockets::getMyHostName(), 9183); + Socket client(socket_context, Socket::getMyHostName(), 9183); REQUIRE(-1 != client.initialize()); @@ -166,7 +169,7 @@ TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket7]") { client.close(); - REQUIRE(-1 == client.write(buffer, 1)); + REQUIRE(io::isError(client.write(buffer, 1))); server.close(); } @@ -182,7 +185,7 @@ bool createSocket() { std::this_thread::sleep_for(std::chrono::milliseconds { distribution(seed) }); for (int i = 0; i < 50; i++) { - std::shared_ptr<org::apache::nifi::minifi::io::TLSContext> socketA = std::make_shared<org::apache::nifi::minifi::io::TLSContext>(configuration); + std::shared_ptr<io::TLSContext> socketA = std::make_shared<io::TLSContext>(configuration); socketA->initialize(); } @@ -219,10 +222,10 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket8]") { TEST_CASE("TestTLSContextCreation2", "[TestSocket9]") { std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(); configure->set("nifi.remote.input.secure", "false"); - auto factory = minifi::io::StreamFactory::getInstance(configure); - std::string host = Sockets::getMyHostName(); - minifi::io::Socket *socket = factory->createSocket(host, 10001).release(); - minifi::io::TLSSocket *tls = dynamic_cast<minifi::io::TLSSocket*>(socket); + auto factory = io::StreamFactory::getInstance(configure); + std::string host = Socket::getMyHostName(); + Socket *socket = factory->createSocket(host, 10001).release(); + io::TLSSocket *tls = dynamic_cast<io::TLSSocket*>(socket); REQUIRE(tls == nullptr); } @@ -233,10 +236,10 @@ TEST_CASE("TestTLSContextCreation2", "[TestSocket9]") { TEST_CASE("TestTLSContextCreationNullptr", "[TestSocket10]") { std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(); configure->set("nifi.remote.input.secure", "false"); - auto factory = minifi::io::StreamFactory::getInstance(configure); - std::string host = Sockets::getMyHostName(); - minifi::io::Socket *socket = factory->createSecureSocket(host, 10001, nullptr).release(); - minifi::io::TLSSocket *tls = dynamic_cast<minifi::io::TLSSocket*>(socket); + auto factory = io::StreamFactory::getInstance(configure); + std::string host = Socket::getMyHostName(); + io::Socket *socket = factory->createSecureSocket(host, 10001, nullptr).release(); + io::TLSSocket *tls = dynamic_cast<minifi::io::TLSSocket*>(socket); REQUIRE(tls == nullptr); } #endif // OPENSSL_SUPPORT diff --git a/libminifi/test/unit/ZlibStreamTests.cpp b/libminifi/test/unit/ZlibStreamTests.cpp index 3418448..dbf3b31 100644 --- a/libminifi/test/unit/ZlibStreamTests.cpp +++ b/libminifi/test/unit/ZlibStreamTests.cpp @@ -35,17 +35,17 @@ TEST_CASE("gzip compression and decompression", "[basic]") { SECTION("Empty") { } SECTION("Simple content in one write") { - int length = gsl::narrow<int>(strlen("foobar")); - REQUIRE(length == compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("foobar")), length)); + const auto length = strlen("foobar"); + REQUIRE(length == compressStream.write(reinterpret_cast<const uint8_t*>("foobar"), length)); original += "foobar"; } SECTION("Simple content in two writes") { - int foo_length = gsl::narrow<int>(strlen("foo")); - REQUIRE(foo_length == compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("foo")), foo_length)); + const auto foo_length = strlen("foo"); + REQUIRE(foo_length == compressStream.write(reinterpret_cast<const uint8_t*>("foo"), foo_length)); original += "foo"; - int bar_length = gsl::narrow<int>(strlen("bar")); - REQUIRE(bar_length == compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("bar")), bar_length)); + const auto bar_length = strlen("bar"); + REQUIRE(bar_length == compressStream.write(reinterpret_cast<const uint8_t*>("bar"), bar_length)); original += "bar"; } SECTION("Large data") { @@ -55,7 +55,7 @@ TEST_CASE("gzip compression and decompression", "[basic]") { for (size_t i = 0U; i < 1024U; i++) { std::generate(buf.begin(), buf.end(), [&](){return dist(gen);}); original += std::string(reinterpret_cast<const char*>(buf.data()), buf.size()); - REQUIRE(-1 != compressStream.write(buf.data(), gsl::narrow<int>(buf.size()))); + REQUIRE_FALSE(io::isError(compressStream.write(buf.data(), buf.size()))); } } @@ -71,7 +71,7 @@ TEST_CASE("gzip compression and decompression", "[basic]") { io::BufferStream decompressBuffer; io::ZlibDecompressStream decompressStream(gsl::make_not_null(&decompressBuffer)); - decompressStream.write(const_cast<uint8_t*>(compressBuffer.getBuffer()), gsl::narrow<int>(compressBuffer.size())); + decompressStream.write(compressBuffer.getBuffer(), compressBuffer.size()); REQUIRE(decompressStream.isFinished()); REQUIRE(original == std::string(reinterpret_cast<const char*>(decompressBuffer.getBuffer()), decompressBuffer.size())); @@ -86,17 +86,17 @@ TEST_CASE("gzip compression and decompression pipeline", "[basic]") { SECTION("Empty") { } SECTION("Simple content in one write") { - int foobar_length = gsl::narrow<int>(strlen("foobar")); - REQUIRE(foobar_length == compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("foobar")), foobar_length)); + const auto foobar_length = strlen("foobar"); + REQUIRE(foobar_length == compressStream.write(reinterpret_cast<const uint8_t*>("foobar"), foobar_length)); original += "foobar"; } SECTION("Simple content in two writes") { - int foo_length = gsl::narrow<int>(strlen("foo")); - REQUIRE(foo_length == compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("foo")), foo_length)); + const auto foo_length = strlen("foo"); + REQUIRE(foo_length == compressStream.write(reinterpret_cast<const uint8_t*>("foo"), foo_length)); original += "foo"; - int bar_length = gsl::narrow<int>(strlen("bar")); - REQUIRE(bar_length == compressStream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("bar")), bar_length)); + const auto bar_length = strlen("bar"); + REQUIRE(bar_length == compressStream.write(reinterpret_cast<const uint8_t*>("bar"), bar_length)); original += "bar"; } SECTION("Large data") { @@ -106,8 +106,7 @@ TEST_CASE("gzip compression and decompression pipeline", "[basic]") { for (size_t i = 0U; i < 1024U; i++) { std::generate(buf.begin(), buf.end(), [&](){return dist(gen);}); original += std::string(reinterpret_cast<const char*>(buf.data()), buf.size()); - int buffer_size = gsl::narrow<int>(buf.size()); - REQUIRE(buffer_size == compressStream.write(buf.data(), buffer_size)); + REQUIRE(buf.size() == compressStream.write(buf.data(), buf.size())); } } diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp index d3808a1..0b724d2 100644 --- a/nanofi/src/api/nanofi.cpp +++ b/nanofi/src/api/nanofi.cpp @@ -510,7 +510,7 @@ int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) { } else { file_buffer fb = file_to_buffer(ff->contentLocation); stream = std::make_shared<minifi::io::BufferStream>(); - stream->write(fb.buffer, gsl::narrow<int>(fb.file_len)); + stream->write(fb.buffer, fb.file_len); free(fb.buffer); } } else { @@ -718,7 +718,7 @@ flow_file_record *invoke_ff(standalone_processor* proc, const flow_file_record * } else { ff_data->content_stream = std::make_shared<minifi::io::BufferStream>(); file_buffer fb = file_to_buffer(input_ff->contentLocation); - ff_data->content_stream->write(fb.buffer, gsl::narrow<int>(fb.file_len)); + ff_data->content_stream->write(fb.buffer, fb.file_len); free(fb.buffer); } @@ -745,7 +745,7 @@ flow_file_record *invoke_chunk(standalone_processor* proc, uint8_t* buf, uint64_ auto ff_data = std::make_shared<flowfile_input_params>(); ff_data->content_stream = std::make_shared<minifi::io::BufferStream>(); - ff_data->content_stream->write(buf, gsl::narrow<int>(size)); + ff_data->content_stream->write(buf, size); plan->runNextProcessor(nullptr, ff_data); while (plan->runNextProcessor()) {
