This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit f7f29cbc7f32a852912b2121c57a2e2ee81cf185
Author: Ferenc Gerlits <[email protected]>
AuthorDate: Mon Jul 19 15:44:13 2021 +0200

    MINIFICPP-1606 ProcessSession::read() should return int64_t
    
    Since ProcessSession::read() returns the size of the flow file, and its 
return
    type was int, we still couldn't handle flow files larger than around 2 GB.
    
    Closes #1131
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 extensions/libarchive/MergeContent.h                   | 6 ++++--
 libminifi/include/core/ProcessSession.h                | 2 +-
 libminifi/include/serialization/FlowFileSerializer.h   | 4 ++--
 libminifi/include/serialization/FlowFileV3Serializer.h | 2 +-
 libminifi/include/serialization/PayloadSerializer.h    | 2 +-
 libminifi/src/core/ProcessSession.cpp                  | 4 ++--
 libminifi/src/serialization/FlowFileV3Serializer.cpp   | 4 ++--
 libminifi/src/serialization/PayloadSerializer.cpp      | 2 +-
 8 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/extensions/libarchive/MergeContent.h 
b/extensions/libarchive/MergeContent.h
index 7fd852a..3d36652 100644
--- a/extensions/libarchive/MergeContent.h
+++ b/extensions/libarchive/MergeContent.h
@@ -77,11 +77,13 @@ class BinaryConcatenationMerge : public MergeBin {
         std::deque<std::shared_ptr<core::FlowFile>> &flows, 
FlowFileSerializer& serializer) :
       header_(header), footer_(footer), demarcator_(demarcator), 
flows_(flows), serializer_(serializer) {
     }
+
     std::string &header_;
     std::string &footer_;
     std::string &demarcator_;
     std::deque<std::shared_ptr<core::FlowFile>> &flows_;
     FlowFileSerializer& serializer_;
+
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       size_t write_size_sum = 0;
       if (!header_.empty()) {
@@ -98,7 +100,7 @@ class BinaryConcatenationMerge : public MergeBin {
             return -1;
           write_size_sum += write_ret;
         }
-        int len = serializer_.serialize(flow, stream);
+        const auto len = serializer_.serialize(flow, stream);
         if (len < 0)
           return len;
         write_size_sum += gsl::narrow<size_t>(len);
@@ -231,7 +233,7 @@ class ArchiveMerge {
             }
           }
         }
-        int ret = serializer_.serialize(flow, 
std::make_shared<ArchiveWriter>(arch, entry));
+        const auto ret = serializer_.serialize(flow, 
std::make_shared<ArchiveWriter>(arch, entry));
         if (ret < 0) {
           return ret;
         }
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index 302c8b1..5511790 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -94,7 +94,7 @@ class ProcessSession : public ReferenceContainer {
   // Remove Flow File
   void remove(const std::shared_ptr<core::FlowFile> &flow);
   // Execute the given read callback against the content
-  int read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback 
*callback);
+  int64_t read(const std::shared_ptr<core::FlowFile> &flow, 
InputStreamCallback *callback);
   // Execute the given write callback against the content
   void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback 
*callback);
   // Replace content with buffer
diff --git a/libminifi/include/serialization/FlowFileSerializer.h 
b/libminifi/include/serialization/FlowFileSerializer.h
index 02f21e4..5393068 100644
--- a/libminifi/include/serialization/FlowFileSerializer.h
+++ b/libminifi/include/serialization/FlowFileSerializer.h
@@ -42,11 +42,11 @@ class InputStreamCallback;
 
 class FlowFileSerializer {
  public:
-  using FlowFileReader = std::function<int(const 
std::shared_ptr<core::FlowFile>&, InputStreamCallback*)>;
+  using FlowFileReader = std::function<int64_t(const 
std::shared_ptr<core::FlowFile>&, InputStreamCallback*)>;
 
   explicit FlowFileSerializer(FlowFileReader reader) : 
reader_(std::move(reader)) {}
 
-  virtual int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const 
std::shared_ptr<io::OutputStream>& out) = 0;
+  virtual int64_t serialize(const std::shared_ptr<core::FlowFile>& flowFile, 
const std::shared_ptr<io::OutputStream>& out) = 0;
 
   virtual ~FlowFileSerializer() = default;
 
diff --git a/libminifi/include/serialization/FlowFileV3Serializer.h 
b/libminifi/include/serialization/FlowFileV3Serializer.h
index 06a284c..656422a 100644
--- a/libminifi/include/serialization/FlowFileV3Serializer.h
+++ b/libminifi/include/serialization/FlowFileV3Serializer.h
@@ -41,7 +41,7 @@ class FlowFileV3Serializer : public FlowFileSerializer {
  public:
   using FlowFileSerializer::FlowFileSerializer;
 
-  int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const 
std::shared_ptr<io::OutputStream>& out) override;
+  int64_t serialize(const std::shared_ptr<core::FlowFile>& flowFile, const 
std::shared_ptr<io::OutputStream>& out) override;
 };
 
 } /* namespace minifi */
diff --git a/libminifi/include/serialization/PayloadSerializer.h 
b/libminifi/include/serialization/PayloadSerializer.h
index ed26e85..efa3384 100644
--- a/libminifi/include/serialization/PayloadSerializer.h
+++ b/libminifi/include/serialization/PayloadSerializer.h
@@ -31,7 +31,7 @@ class PayloadSerializer : public FlowFileSerializer {
  public:
   using FlowFileSerializer::FlowFileSerializer;
 
-  int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const 
std::shared_ptr<io::OutputStream>& out) override;
+  int64_t serialize(const std::shared_ptr<core::FlowFile>& flowFile, const 
std::shared_ptr<io::OutputStream>& out) override;
 };
 
 } /* namespace minifi */
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 887798e..3c57b68 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -290,7 +290,7 @@ void ProcessSession::append(const 
std::shared_ptr<core::FlowFile> &flow, OutputS
   }
 }
 
-int ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, 
InputStreamCallback *callback) {
+int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, 
InputStreamCallback *callback) {
   try {
     std::shared_ptr<ResourceClaim> claim = nullptr;
 
@@ -317,7 +317,7 @@ int ProcessSession::read(const 
std::shared_ptr<core::FlowFile> &flow, InputStrea
     if (ret < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile 
content");
     }
-    return gsl::narrow<int>(ret);
+    return ret;
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
diff --git a/libminifi/src/serialization/FlowFileV3Serializer.cpp 
b/libminifi/src/serialization/FlowFileV3Serializer.cpp
index 2407f87..588c36f 100644
--- a/libminifi/src/serialization/FlowFileV3Serializer.cpp
+++ b/libminifi/src/serialization/FlowFileV3Serializer.cpp
@@ -61,7 +61,7 @@ size_t FlowFileV3Serializer::writeString(const std::string 
&str, const std::shar
   return sum;
 }
 
-int FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>& 
flowFile, const std::shared_ptr<io::OutputStream>& out) {
+int64_t FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>& 
flowFile, const std::shared_ptr<io::OutputStream>& out) {
   size_t sum = 0;
   {
     const auto ret = out->write(MAGIC_HEADER, sizeof(MAGIC_HEADER));
@@ -98,7 +98,7 @@ int FlowFileV3Serializer::serialize(const 
std::shared_ptr<core::FlowFile>& flowF
     if (ret < 0) return -1;
     sum += gsl::narrow<size_t>(ret);
   }
-  return gsl::narrow<int>(sum);
+  return gsl::narrow<int64_t>(sum);
 }
 
 } /* namespace minifi */
diff --git a/libminifi/src/serialization/PayloadSerializer.cpp 
b/libminifi/src/serialization/PayloadSerializer.cpp
index 44b0ed4..8bfa112 100644
--- a/libminifi/src/serialization/PayloadSerializer.cpp
+++ b/libminifi/src/serialization/PayloadSerializer.cpp
@@ -24,7 +24,7 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-int PayloadSerializer::serialize(const std::shared_ptr<core::FlowFile>& 
flowFile, const std::shared_ptr<io::OutputStream>& out) {
+int64_t PayloadSerializer::serialize(const std::shared_ptr<core::FlowFile>& 
flowFile, const std::shared_ptr<io::OutputStream>& out) {
   InputStreamPipe pipe(out);
   return reader_(flowFile, &pipe);
 }

Reply via email to