This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new bbf0bb1 MINIFICPP-1650: ProcessSession::append sets the flowfile size
incorrectly
bbf0bb1 is described below
commit bbf0bb1a4c3b093c42e5cb0f945d97ae00a74d5a
Author: Martin Zink <[email protected]>
AuthorDate: Fri Oct 1 13:55:26 2021 +0200
MINIFICPP-1650: ProcessSession::append sets the flowfile size incorrectly
Signed-off-by: Adam Debreceni <[email protected]>
This closes #1181
---
libminifi/src/core/ProcessSession.cpp | 9 +++++----
.../rocksdb-tests/DBContentRepositoryTests.cpp | 1 +
.../test/unit/ContentRepositoryDependentTests.h | 23 ++++++++++++++++++++++
libminifi/test/unit/ProcessSessionTests.cpp | 2 ++
4 files changed, 31 insertions(+), 4 deletions(-)
diff --git a/libminifi/src/core/ProcessSession.cpp
b/libminifi/src/core/ProcessSession.cpp
index b31e63e..82d748d 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -269,14 +269,15 @@ void ProcessSession::append(const
std::shared_ptr<core::FlowFile> &flow, OutputS
}
// Call the callback to write the content
- size_t oldPos = stream->size();
+ size_t flow_file_size = flow->getSize();
+ size_t stream_size_before_callback = stream->size();
// this prevents an issue if we write, above, with zero length.
- if (oldPos > 0)
- stream->seek(oldPos + 1);
+ if (stream_size_before_callback > 0)
+ stream->seek(stream_size_before_callback + 1);
if (callback->process(stream) < 0) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile
content");
}
- flow->setSize(stream->size());
+ flow->setSize(flow_file_size + (stream->size() -
stream_size_before_callback));
std::stringstream details;
details << process_context_->getProcessorNode()->getName() << " modify
flow record content " << flow->getUUIDStr();
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 1b6383a..c456380 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -229,5 +229,6 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
TEST_CASE("ProcessSession::read reads the flowfile from offset to size",
"[readoffsetsize]") {
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::DatabaseContentRepository>());
+
ContentRepositoryDependentTests::testAppendSize(std::make_shared<core::repository::DatabaseContentRepository>());
}
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h
b/libminifi/test/unit/ContentRepositoryDependentTests.h
index 01c6563..09aa598 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -103,6 +103,13 @@ class Fixture {
process_session_->commit();
}
+ void appendAndCommit(const std::shared_ptr<core::FlowFile>& flow_file, const
std::string content_to_append) {
+ WriteStringToFlowFile callback(content_to_append);
+ process_session_->append(flow_file, &callback);
+ process_session_->transfer(flow_file, Success);
+ process_session_->commit();
+ }
+
private:
TestController test_controller_;
@@ -140,4 +147,20 @@ void
testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> c
CHECK(read_until_stream_size_callback.value_ == "bar");
CHECK(read_until_it_can_callback.value_ == "bar");
}
+
+void testAppendSize(std::shared_ptr<core::ContentRepository> content_repo) {
+ Fixture fixture = Fixture(content_repo);
+ core::ProcessSession& process_session = fixture.processSession();
+ fixture.commitFlowFile("my");
+ const auto flow_file = process_session.get();
+ fixture.appendAndCommit(flow_file, "foobar");
+ REQUIRE(flow_file);
+ CHECK(flow_file->getSize() == 8);
+ ReadUntilStreamSize read_until_stream_size_callback;
+ ReadUntilItCan read_until_it_can_callback;
+ process_session.read(flow_file, &read_until_stream_size_callback);
+ process_session.read(flow_file, &read_until_it_can_callback);
+ CHECK(read_until_stream_size_callback.value_ == "myfoobar");
+ CHECK(read_until_it_can_callback.value_ == "myfoobar");
+}
} // namespace ContentRepositoryDependentTests
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp
b/libminifi/test/unit/ProcessSessionTests.cpp
index 5629d43..66f259b 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -99,4 +99,6 @@ TEST_CASE("ProcessSession::rollback penalizes affected
flowfiles", "[rollback]")
TEST_CASE("ProcessSession::read reads the flowfile from offset to size",
"[readoffsetsize]") {
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::VolatileContentRepository>());
ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared<core::repository::FileSystemRepository>());
+
ContentRepositoryDependentTests::testAppendSize(std::make_shared<core::repository::VolatileContentRepository>());
+
ContentRepositoryDependentTests::testAppendSize(std::make_shared<core::repository::FileSystemRepository>());
}