This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit d4d917afd15fa7760d6f29d2045245244d3b7dc0
Author: Adam Debreceni <[email protected]>
AuthorDate: Fri Aug 16 13:45:14 2024 +0000

    MINIFICPP-2427 - Copy whole content instead of creating a view
    
    Closes #1847
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 .../rocksdb-repos/tests/ContentSessionTests.cpp    |  7 ++--
 libminifi/include/core/ProcessSession.h            |  4 +++
 libminifi/src/core/BufferedContentSession.cpp      |  5 ++-
 libminifi/src/core/ProcessSession.cpp              | 37 ++++++++++++++--------
 libminifi/src/io/BufferStream.cpp                  |  1 +
 5 files changed, 37 insertions(+), 17 deletions(-)

diff --git a/extensions/rocksdb-repos/tests/ContentSessionTests.cpp 
b/extensions/rocksdb-repos/tests/ContentSessionTests.cpp
index 8d35589e1..3a612b758 100644
--- a/extensions/rocksdb-repos/tests/ContentSessionTests.cpp
+++ b/extensions/rocksdb-repos/tests/ContentSessionTests.cpp
@@ -124,9 +124,10 @@ void test_template() {
 
   auto claim1 = session->create();
   session->write(claim1) << "hello content!";
-  // TODO(adebreceni): MINIFICPP-1954
-  if (is_buffered_session) {
-    REQUIRE_THROWS(session->read(claim1));
+  {
+    std::string content;
+    session->read(claim1) >> content;
+    REQUIRE(content == "hello content!");
   }
 
   auto claim2 = session->create();
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index 912f0bef2..59130a7ce 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -94,10 +94,14 @@ class ProcessSession : public ReferenceContainer {
   std::shared_ptr<io::InputStream> getFlowFileContentStream(const 
core::FlowFile& flow_file);
   // Execute the given read callback against the content
   int64_t read(const std::shared_ptr<core::FlowFile>& flow_file, const 
io::InputStreamCallback& callback);
+
+  int64_t read(const core::FlowFile& flow_file, const io::InputStreamCallback& 
callback);
   // Read content into buffer
   detail::ReadBufferResult readBuffer(const std::shared_ptr<core::FlowFile>& 
flow);
   // Execute the given write callback against the content
   void write(const std::shared_ptr<core::FlowFile> &flow, const 
io::OutputStreamCallback& callback);
+
+  void write(core::FlowFile& flow, const io::OutputStreamCallback& callback);
   // Read and write the flow file at the same time (eg. for processing it line 
by line)
   int64_t readWrite(const std::shared_ptr<core::FlowFile> &flow, const 
io::InputOutputStreamCallback& callback);
   // Replace content with buffer
diff --git a/libminifi/src/core/BufferedContentSession.cpp 
b/libminifi/src/core/BufferedContentSession.cpp
index 11d370fbc..3afad2c8a 100644
--- a/libminifi/src/core/BufferedContentSession.cpp
+++ b/libminifi/src/core/BufferedContentSession.cpp
@@ -59,7 +59,10 @@ std::shared_ptr<io::BaseStream> 
BufferedContentSession::read(const std::shared_p
   // TODO(adebreceni):
   //  after the stream refactor is merged we should be able to share the 
underlying buffer
   //  between multiple InputStreams, moreover create a ConcatInputStream
-  if (managed_resources_.contains(resource_id) || 
append_state_.contains(resource_id)) {
+  if (auto it = managed_resources_.find(resource_id); it != 
managed_resources_.end()) {
+    return it->second;
+  }
+  if (append_state_.contains(resource_id)) {
     throw Exception(REPOSITORY_EXCEPTION, "Can only read non-modified 
resource");
   }
   return repository_->read(*resource_id);
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 786872600..1d62e3d3a 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -32,6 +32,7 @@
 
 #include "core/ProcessSessionReadCallback.h"
 #include "io/StreamSlice.h"
+#include "io/StreamPipe.h"
 #include "utils/gsl.h"
 
 /* This implementation is only for native Windows systems.  */
@@ -184,10 +185,12 @@ std::shared_ptr<core::FlowFile> 
ProcessSession::clone(const FlowFile& parent, in
   if (record) {
     logger_->log_debug("Cloned parent flow files {} to {}, with {}:{}", 
parent.getUUIDStr(), record->getUUIDStr(), offset, size);
     if (parent.getResourceClaim()) {
-      record->setOffset(parent.getOffset() + offset);
-      record->setSize(size);
-      // Copy Resource Claim
-      record->setResourceClaim(parent.getResourceClaim());
+      write(record, [&] (const std::shared_ptr<io::OutputStream>& output) -> 
int64_t {
+        return read(parent, [&] (const std::shared_ptr<io::InputStream>& 
input) -> int64_t {
+          io::StreamSlice slice(input, offset, size);
+          return minifi::internal::pipe(slice, *output);
+        });
+      });
     }
     provenance_report_->clone(parent, *record);
   }
@@ -236,9 +239,13 @@ void ProcessSession::transferToCustomRelationship(const 
std::shared_ptr<core::Fl
 }
 
 void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, const 
io::OutputStreamCallback& callback) {
-  gsl_ExpectsAudit(updated_flowfiles_.contains(flow->getUUID())
-      || added_flowfiles_.contains(flow->getUUID())
-      || std::any_of(cloned_flowfiles_.begin(), cloned_flowfiles_.end(), 
[&flow](const auto& flow_file) { return flow == flow_file; }));
+  return write(*flow, callback);
+}
+
+void ProcessSession::write(core::FlowFile &flow, const 
io::OutputStreamCallback& callback) {
+  gsl_ExpectsAudit(updated_flowfiles_.contains(flow.getUUID())
+      || added_flowfiles_.contains(flow.getUUID())
+      || std::any_of(cloned_flowfiles_.begin(), cloned_flowfiles_.end(), 
[&flow](const auto& flow_file) { return &flow == flow_file.get(); }));
 
   std::shared_ptr<ResourceClaim> claim = content_session_->create();
 
@@ -253,14 +260,14 @@ void ProcessSession::write(const 
std::shared_ptr<core::FlowFile> &flow, const io
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile 
content");
     }
 
-    flow->setSize(stream->size());
-    flow->setOffset(0);
-    flow->setResourceClaim(claim);
+    flow.setSize(stream->size());
+    flow.setOffset(0);
+    flow.setResourceClaim(claim);
 
     stream->close();
-    std::string details = process_context_->getProcessorNode()->getName() + " 
modify flow record content " + flow->getUUIDStr();
+    std::string details = process_context_->getProcessorNode()->getName() + " 
modify flow record content " + flow.getUUIDStr();
     auto duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start_time);
-    provenance_report_->modifyContent(*flow, details, duration);
+    provenance_report_->modifyContent(flow, details, duration);
   } catch (const std::exception& exception) {
     logger_->log_debug("Caught Exception during process session write, type: 
{}, what: {}", typeid(exception).name(), exception.what());
     throw;
@@ -351,8 +358,12 @@ std::shared_ptr<io::InputStream> 
ProcessSession::getFlowFileContentStream(const
 }
 
 int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile>& flow_file, 
const io::InputStreamCallback& callback) {
+  return read(*flow_file, callback);
+}
+
+int64_t ProcessSession::read(const core::FlowFile& flow_file, const 
io::InputStreamCallback& callback) {
   try {
-    auto flow_file_stream = getFlowFileContentStream(*flow_file);
+    auto flow_file_stream = getFlowFileContentStream(flow_file);
     if (!flow_file_stream) {
       return 0;
     }
diff --git a/libminifi/src/io/BufferStream.cpp 
b/libminifi/src/io/BufferStream.cpp
index de52a5115..235fd595c 100644
--- a/libminifi/src/io/BufferStream.cpp
+++ b/libminifi/src/io/BufferStream.cpp
@@ -32,6 +32,7 @@ size_t BufferStream::write(const uint8_t *value, size_t size) 
{
 }
 
 size_t BufferStream::read(std::span<std::byte> buf) {
+  readOffset_ = std::min<uint64_t>(buffer_.size(), readOffset_);
   const auto bytes_available_in_buffer = buffer_.size() - readOffset_;
   const auto readlen = std::min(buf.size(), 
gsl::narrow<size_t>(bytes_available_in_buffer));
   const auto begin = buffer_.begin() + 
gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);

Reply via email to