This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit d4d917afd15fa7760d6f29d2045245244d3b7dc0 Author: Adam Debreceni <[email protected]> AuthorDate: Fri Aug 16 13:45:14 2024 +0000 MINIFICPP-2427 - Copy whole content instead of creating a view Closes #1847 Signed-off-by: Marton Szasz <[email protected]> --- .../rocksdb-repos/tests/ContentSessionTests.cpp | 7 ++-- libminifi/include/core/ProcessSession.h | 4 +++ libminifi/src/core/BufferedContentSession.cpp | 5 ++- libminifi/src/core/ProcessSession.cpp | 37 ++++++++++++++-------- libminifi/src/io/BufferStream.cpp | 1 + 5 files changed, 37 insertions(+), 17 deletions(-) diff --git a/extensions/rocksdb-repos/tests/ContentSessionTests.cpp b/extensions/rocksdb-repos/tests/ContentSessionTests.cpp index 8d35589e1..3a612b758 100644 --- a/extensions/rocksdb-repos/tests/ContentSessionTests.cpp +++ b/extensions/rocksdb-repos/tests/ContentSessionTests.cpp @@ -124,9 +124,10 @@ void test_template() { auto claim1 = session->create(); session->write(claim1) << "hello content!"; - // TODO(adebreceni): MINIFICPP-1954 - if (is_buffered_session) { - REQUIRE_THROWS(session->read(claim1)); + { + std::string content; + session->read(claim1) >> content; + REQUIRE(content == "hello content!"); } auto claim2 = session->create(); diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index 912f0bef2..59130a7ce 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -94,10 +94,14 @@ class ProcessSession : public ReferenceContainer { std::shared_ptr<io::InputStream> getFlowFileContentStream(const core::FlowFile& flow_file); // Execute the given read callback against the content int64_t read(const std::shared_ptr<core::FlowFile>& flow_file, const io::InputStreamCallback& callback); + + int64_t read(const core::FlowFile& flow_file, const io::InputStreamCallback& callback); // Read content into buffer detail::ReadBufferResult readBuffer(const std::shared_ptr<core::FlowFile>& flow); // Execute the given write callback against the content void write(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback); + + void write(core::FlowFile& flow, const io::OutputStreamCallback& callback); // Read and write the flow file at the same time (eg. for processing it line by line) int64_t readWrite(const std::shared_ptr<core::FlowFile> &flow, const io::InputOutputStreamCallback& callback); // Replace content with buffer diff --git a/libminifi/src/core/BufferedContentSession.cpp b/libminifi/src/core/BufferedContentSession.cpp index 11d370fbc..3afad2c8a 100644 --- a/libminifi/src/core/BufferedContentSession.cpp +++ b/libminifi/src/core/BufferedContentSession.cpp @@ -59,7 +59,10 @@ std::shared_ptr<io::BaseStream> BufferedContentSession::read(const std::shared_p // TODO(adebreceni): // after the stream refactor is merged we should be able to share the underlying buffer // between multiple InputStreams, moreover create a ConcatInputStream - if (managed_resources_.contains(resource_id) || append_state_.contains(resource_id)) { + if (auto it = managed_resources_.find(resource_id); it != managed_resources_.end()) { + return it->second; + } + if (append_state_.contains(resource_id)) { throw Exception(REPOSITORY_EXCEPTION, "Can only read non-modified resource"); } return repository_->read(*resource_id); diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 786872600..1d62e3d3a 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -32,6 +32,7 @@ #include "core/ProcessSessionReadCallback.h" #include "io/StreamSlice.h" +#include "io/StreamPipe.h" #include "utils/gsl.h" /* This implementation is only for native Windows systems. */ @@ -184,10 +185,12 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const FlowFile& parent, in if (record) { logger_->log_debug("Cloned parent flow files {} to {}, with {}:{}", parent.getUUIDStr(), record->getUUIDStr(), offset, size); if (parent.getResourceClaim()) { - record->setOffset(parent.getOffset() + offset); - record->setSize(size); - // Copy Resource Claim - record->setResourceClaim(parent.getResourceClaim()); + write(record, [&] (const std::shared_ptr<io::OutputStream>& output) -> int64_t { + return read(parent, [&] (const std::shared_ptr<io::InputStream>& input) -> int64_t { + io::StreamSlice slice(input, offset, size); + return minifi::internal::pipe(slice, *output); + }); + }); } provenance_report_->clone(parent, *record); } @@ -236,9 +239,13 @@ void ProcessSession::transferToCustomRelationship(const std::shared_ptr<core::Fl } void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) { - gsl_ExpectsAudit(updated_flowfiles_.contains(flow->getUUID()) - || added_flowfiles_.contains(flow->getUUID()) - || std::any_of(cloned_flowfiles_.begin(), cloned_flowfiles_.end(), [&flow](const auto& flow_file) { return flow == flow_file; })); + return write(*flow, callback); +} + +void ProcessSession::write(core::FlowFile &flow, const io::OutputStreamCallback& callback) { + gsl_ExpectsAudit(updated_flowfiles_.contains(flow.getUUID()) + || added_flowfiles_.contains(flow.getUUID()) + || std::any_of(cloned_flowfiles_.begin(), cloned_flowfiles_.end(), [&flow](const auto& flow_file) { return &flow == flow_file.get(); })); std::shared_ptr<ResourceClaim> claim = content_session_->create(); @@ -253,14 +260,14 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, const io throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } - flow->setSize(stream->size()); - flow->setOffset(0); - flow->setResourceClaim(claim); + flow.setSize(stream->size()); + flow.setOffset(0); + flow.setResourceClaim(claim); stream->close(); - std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flow->getUUIDStr(); + std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flow.getUUIDStr(); auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time); - provenance_report_->modifyContent(*flow, details, duration); + provenance_report_->modifyContent(flow, details, duration); } catch (const std::exception& exception) { logger_->log_debug("Caught Exception during process session write, type: {}, what: {}", typeid(exception).name(), exception.what()); throw; @@ -351,8 +358,12 @@ std::shared_ptr<io::InputStream> ProcessSession::getFlowFileContentStream(const } int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile>& flow_file, const io::InputStreamCallback& callback) { + return read(*flow_file, callback); +} + +int64_t ProcessSession::read(const core::FlowFile& flow_file, const io::InputStreamCallback& callback) { try { - auto flow_file_stream = getFlowFileContentStream(*flow_file); + auto flow_file_stream = getFlowFileContentStream(flow_file); if (!flow_file_stream) { return 0; } diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp index de52a5115..235fd595c 100644 --- a/libminifi/src/io/BufferStream.cpp +++ b/libminifi/src/io/BufferStream.cpp @@ -32,6 +32,7 @@ size_t BufferStream::write(const uint8_t *value, size_t size) { } size_t BufferStream::read(std::span<std::byte> buf) { + readOffset_ = std::min<uint64_t>(buffer_.size(), readOffset_); const auto bytes_available_in_buffer = buffer_.size() - readOffset_; const auto readlen = std::min(buf.size(), gsl::narrow<size_t>(bytes_available_in_buffer)); const auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
