This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit f7f29cbc7f32a852912b2121c57a2e2ee81cf185 Author: Ferenc Gerlits <[email protected]> AuthorDate: Mon Jul 19 15:44:13 2021 +0200 MINIFICPP-1606 ProcessSession::read() should return int64_t Since ProcessSession::read() returns the size of the flow file, and its return type was int, we still couldn't handle flow files larger than around 2 GB. Closes #1131 Signed-off-by: Marton Szasz <[email protected]> --- extensions/libarchive/MergeContent.h | 6 ++++-- libminifi/include/core/ProcessSession.h | 2 +- libminifi/include/serialization/FlowFileSerializer.h | 4 ++-- libminifi/include/serialization/FlowFileV3Serializer.h | 2 +- libminifi/include/serialization/PayloadSerializer.h | 2 +- libminifi/src/core/ProcessSession.cpp | 4 ++-- libminifi/src/serialization/FlowFileV3Serializer.cpp | 4 ++-- libminifi/src/serialization/PayloadSerializer.cpp | 2 +- 8 files changed, 14 insertions(+), 12 deletions(-) diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h index 7fd852a..3d36652 100644 --- a/extensions/libarchive/MergeContent.h +++ b/extensions/libarchive/MergeContent.h @@ -77,11 +77,13 @@ class BinaryConcatenationMerge : public MergeBin { std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer) : header_(header), footer_(footer), demarcator_(demarcator), flows_(flows), serializer_(serializer) { } + std::string &header_; std::string &footer_; std::string &demarcator_; std::deque<std::shared_ptr<core::FlowFile>> &flows_; FlowFileSerializer& serializer_; + int64_t process(const std::shared_ptr<io::BaseStream>& stream) override { size_t write_size_sum = 0; if (!header_.empty()) { @@ -98,7 +100,7 @@ class BinaryConcatenationMerge : public MergeBin { return -1; write_size_sum += write_ret; } - int len = serializer_.serialize(flow, stream); + const auto len = serializer_.serialize(flow, stream); if (len < 0) return len; write_size_sum += gsl::narrow<size_t>(len); @@ -231,7 +233,7 @@ class ArchiveMerge { } } } - int ret = serializer_.serialize(flow, std::make_shared<ArchiveWriter>(arch, entry)); + const auto ret = serializer_.serialize(flow, std::make_shared<ArchiveWriter>(arch, entry)); if (ret < 0) { return ret; } diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index 302c8b1..5511790 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -94,7 +94,7 @@ class ProcessSession : public ReferenceContainer { // Remove Flow File void remove(const std::shared_ptr<core::FlowFile> &flow); // Execute the given read callback against the content - int read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback); + int64_t read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback); // Execute the given write callback against the content void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback); // Replace content with buffer diff --git a/libminifi/include/serialization/FlowFileSerializer.h b/libminifi/include/serialization/FlowFileSerializer.h index 02f21e4..5393068 100644 --- a/libminifi/include/serialization/FlowFileSerializer.h +++ b/libminifi/include/serialization/FlowFileSerializer.h @@ -42,11 +42,11 @@ class InputStreamCallback; class FlowFileSerializer { public: - using FlowFileReader = std::function<int(const std::shared_ptr<core::FlowFile>&, InputStreamCallback*)>; + using FlowFileReader = std::function<int64_t(const std::shared_ptr<core::FlowFile>&, InputStreamCallback*)>; explicit FlowFileSerializer(FlowFileReader reader) : reader_(std::move(reader)) {} - virtual int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) = 0; + virtual int64_t serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) = 0; virtual ~FlowFileSerializer() = default; diff --git a/libminifi/include/serialization/FlowFileV3Serializer.h b/libminifi/include/serialization/FlowFileV3Serializer.h index 06a284c..656422a 100644 --- a/libminifi/include/serialization/FlowFileV3Serializer.h +++ b/libminifi/include/serialization/FlowFileV3Serializer.h @@ -41,7 +41,7 @@ class FlowFileV3Serializer : public FlowFileSerializer { public: using FlowFileSerializer::FlowFileSerializer; - int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override; + int64_t serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override; }; } /* namespace minifi */ diff --git a/libminifi/include/serialization/PayloadSerializer.h b/libminifi/include/serialization/PayloadSerializer.h index ed26e85..efa3384 100644 --- a/libminifi/include/serialization/PayloadSerializer.h +++ b/libminifi/include/serialization/PayloadSerializer.h @@ -31,7 +31,7 @@ class PayloadSerializer : public FlowFileSerializer { public: using FlowFileSerializer::FlowFileSerializer; - int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override; + int64_t serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override; }; } /* namespace minifi */ diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 887798e..3c57b68 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -290,7 +290,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS } } -int ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) { +int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) { try { std::shared_ptr<ResourceClaim> claim = nullptr; @@ -317,7 +317,7 @@ int ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStrea if (ret < 0) { throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } - return gsl::narrow<int>(ret); + return ret; } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); throw; diff --git a/libminifi/src/serialization/FlowFileV3Serializer.cpp b/libminifi/src/serialization/FlowFileV3Serializer.cpp index 2407f87..588c36f 100644 --- a/libminifi/src/serialization/FlowFileV3Serializer.cpp +++ b/libminifi/src/serialization/FlowFileV3Serializer.cpp @@ -61,7 +61,7 @@ size_t FlowFileV3Serializer::writeString(const std::string &str, const std::shar return sum; } -int FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) { +int64_t FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) { size_t sum = 0; { const auto ret = out->write(MAGIC_HEADER, sizeof(MAGIC_HEADER)); @@ -98,7 +98,7 @@ int FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>& flowF if (ret < 0) return -1; sum += gsl::narrow<size_t>(ret); } - return gsl::narrow<int>(sum); + return gsl::narrow<int64_t>(sum); } } /* namespace minifi */ diff --git a/libminifi/src/serialization/PayloadSerializer.cpp b/libminifi/src/serialization/PayloadSerializer.cpp index 44b0ed4..8bfa112 100644 --- a/libminifi/src/serialization/PayloadSerializer.cpp +++ b/libminifi/src/serialization/PayloadSerializer.cpp @@ -24,7 +24,7 @@ namespace apache { namespace nifi { namespace minifi { -int PayloadSerializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) { +int64_t PayloadSerializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) { InputStreamPipe pipe(out); return reader_(flowFile, &pipe); }
