This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch MINIFICPP-1507 in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit efd9dc647219c69439b2b785b83cdac2e04b5921 Author: Marton Szasz <[email protected]> AuthorDate: Thu Mar 25 12:11:59 2021 +0100 fix various issues --- extensions/libarchive/CompressContent.h | 39 ++++++++++++++++----------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h index aae9492..b93471f 100644 --- a/extensions/libarchive/CompressContent.h +++ b/extensions/libarchive/CompressContent.h @@ -44,7 +44,7 @@ namespace minifi { namespace processors { // CompressContent Class -class CompressContent: public core::Processor { +class CompressContent : public core::Processor { public: // Constructor /*! @@ -120,7 +120,7 @@ public: status_ = -1; return -1; } - read_size += ret; + read_size += gsl::narrow<uint64_t>(ret); } else { break; } @@ -134,26 +134,24 @@ public: std::shared_ptr<logging::Logger> logger_; }; // Nest Callback Class for read stream from flow for decompress - class ReadCallbackDecompress: public InputStreamCallback { - public: + struct ReadCallbackDecompress : InputStreamCallback { explicit ReadCallbackDecompress(std::shared_ptr<core::FlowFile> flow) : - flow_(std::move(flow)) { + flow_file(std::move(flow)) { } ~ReadCallbackDecompress() override = default; int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { - read_size_ = 0; - stream->seek(offset_); - const auto readRet = stream->read(buffer_, sizeof(buffer_)); - read_size_ = readRet; + stream->seek(offset); + const auto readRet = stream->read(buffer, sizeof(buffer)); + stream_read_result = readRet; if (!io::isError(readRet)) { - offset_ += readRet; + offset += readRet; } return gsl::narrow<int64_t>(readRet); } - size_t read_size_ = 0; - uint8_t buffer_[8192] = {0}; - size_t offset_ = 0; - std::shared_ptr<core::FlowFile> flow_; + size_t stream_read_result = 0; // read size or error code, to be checked with io::isError + uint8_t buffer[8192] = {0}; + size_t offset = 0; + std::shared_ptr<core::FlowFile> flow_file; }; // Nest Callback Class for write stream class WriteCallback: public OutputStreamCallback { @@ -192,8 +190,9 @@ public: static la_ssize_t archive_read(struct archive*, void *context, const void **buff) { auto *callback = (WriteCallback *) context; callback->session_->read(callback->flow_, &callback->readDecompressCb_); - *buff = callback->readDecompressCb_.buffer_; - return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.read_size_); + *buff = callback->readDecompressCb_.buffer; + if (io::isError(callback->readDecompressCb_.stream_read_result)) return -1; + return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.stream_read_result); } static la_int64_t archive_skip(struct archive* /*a*/, void* /*client_data*/, la_int64_t /*request*/) { @@ -420,14 +419,14 @@ public: * @param sessionFactory process session factory that is used when creating * ProcessSession objects. */ - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override; // OnTrigger method, implemented by NiFi CompressContent - virtual void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) { + void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override { } // OnTrigger method, implemented by NiFi CompressContent - virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session); + void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; // Initialize, over write by NiFi CompressContent - virtual void initialize(void); + void initialize() override; private: static std::string toMimeType(CompressionFormat format);
