This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new bbf0bb1  MINIFICPP-1650: ProcessSession::append sets the flowfile size 
incorrectly
bbf0bb1 is described below

commit bbf0bb1a4c3b093c42e5cb0f945d97ae00a74d5a
Author: Martin Zink <[email protected]>
AuthorDate: Fri Oct 1 13:55:26 2021 +0200

    MINIFICPP-1650: ProcessSession::append sets the flowfile size incorrectly
    
    Signed-off-by: Adam Debreceni <[email protected]>
    
    This closes #1181
---
 libminifi/src/core/ProcessSession.cpp              |  9 +++++----
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |  1 +
 .../test/unit/ContentRepositoryDependentTests.h    | 23 ++++++++++++++++++++++
 libminifi/test/unit/ProcessSessionTests.cpp        |  2 ++
 4 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index b31e63e..82d748d 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -269,14 +269,15 @@ void ProcessSession::append(const 
std::shared_ptr<core::FlowFile> &flow, OutputS
     }
     // Call the callback to write the content
 
-    size_t oldPos = stream->size();
+    size_t flow_file_size = flow->getSize();
+    size_t stream_size_before_callback = stream->size();
     // this prevents an issue if we write, above, with zero length.
-    if (oldPos > 0)
-      stream->seek(oldPos + 1);
+    if (stream_size_before_callback > 0)
+      stream->seek(stream_size_before_callback + 1);
     if (callback->process(stream) < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile 
content");
     }
-    flow->setSize(stream->size());
+    flow->setSize(flow_file_size + (stream->size() - 
stream_size_before_callback));
 
     std::stringstream details;
     details << process_context_->getProcessorNode()->getName() << " modify 
flow record content " << flow->getUUIDStr();
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp 
b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 1b6383a..c456380 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -229,5 +229,6 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
 
 TEST_CASE("ProcessSession::read reads the flowfile from offset to size", 
"[readoffsetsize]") {
   
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::DatabaseContentRepository>());
+  
ContentRepositoryDependentTests::testAppendSize(std::make_shared<core::repository::DatabaseContentRepository>());
 }
 
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h 
b/libminifi/test/unit/ContentRepositoryDependentTests.h
index 01c6563..09aa598 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -103,6 +103,13 @@ class Fixture {
     process_session_->commit();
   }
 
+  void appendAndCommit(const std::shared_ptr<core::FlowFile>& flow_file, const 
std::string content_to_append) {
+    WriteStringToFlowFile callback(content_to_append);
+    process_session_->append(flow_file, &callback);
+    process_session_->transfer(flow_file, Success);
+    process_session_->commit();
+  }
+
  private:
   TestController test_controller_;
 
@@ -140,4 +147,20 @@ void 
testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> c
   CHECK(read_until_stream_size_callback.value_ == "bar");
   CHECK(read_until_it_can_callback.value_ == "bar");
 }
+
+void testAppendSize(std::shared_ptr<core::ContentRepository> content_repo) {
+  Fixture fixture = Fixture(content_repo);
+  core::ProcessSession& process_session = fixture.processSession();
+  fixture.commitFlowFile("my");
+  const auto flow_file = process_session.get();
+  fixture.appendAndCommit(flow_file, "foobar");
+  REQUIRE(flow_file);
+  CHECK(flow_file->getSize() == 8);
+  ReadUntilStreamSize read_until_stream_size_callback;
+  ReadUntilItCan read_until_it_can_callback;
+  process_session.read(flow_file, &read_until_stream_size_callback);
+  process_session.read(flow_file, &read_until_it_can_callback);
+  CHECK(read_until_stream_size_callback.value_ == "myfoobar");
+  CHECK(read_until_it_can_callback.value_ == "myfoobar");
+}
 }  // namespace ContentRepositoryDependentTests
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp 
b/libminifi/test/unit/ProcessSessionTests.cpp
index 5629d43..66f259b 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -99,4 +99,6 @@ TEST_CASE("ProcessSession::rollback penalizes affected 
flowfiles", "[rollback]")
 TEST_CASE("ProcessSession::read reads the flowfile from offset to size", 
"[readoffsetsize]") {
   
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::VolatileContentRepository>());
   
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::FileSystemRepository>());
+  
ContentRepositoryDependentTests::testAppendSize(std::make_shared<core::repository::VolatileContentRepository>());
+  
ContentRepositoryDependentTests::testAppendSize(std::make_shared<core::repository::FileSystemRepository>());
 }

Reply via email to