This is an automated email from the ASF dual-hosted git repository. lordgamez pushed a commit to branch MINIFICPP-2600 in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 859cc6f12454c79bc2a95faa68d5e733e413d61e Author: Gabor Gyimesi <[email protected]> AuthorDate: Tue Aug 5 17:21:28 2025 +0200 MINIFICPP-2600 Change RecordSetReader interface RecordSetReader interface expected a flow file as an input, but in some scenarios the records are read from other sources like a network interface. We change the interface to expect an InputStream that as a more generic solution. --- extensions/python/types/PyRecordSetReader.cpp | 7 ++++++- .../standard-processors/controllers/JsonTreeReader.cpp | 12 ++++++------ extensions/standard-processors/controllers/JsonTreeReader.h | 4 +--- extensions/standard-processors/processors/SplitRecord.cpp | 6 +++++- libminifi/test/libtest/unit/RecordSetTesters.h | 11 +++-------- minifi-api/include/minifi-cpp/controllers/RecordSetReader.h | 5 ++--- 6 files changed, 23 insertions(+), 22 deletions(-) diff --git a/extensions/python/types/PyRecordSetReader.cpp b/extensions/python/types/PyRecordSetReader.cpp index d36f6a11f..f168aa68a 100644 --- a/extensions/python/types/PyRecordSetReader.cpp +++ b/extensions/python/types/PyRecordSetReader.cpp @@ -86,7 +86,12 @@ PyObject* PyRecordSetReader::read(PyRecordSetReader* self, PyObject* args) { return nullptr; } - auto read_result = record_set_reader->read(flow_file, process_session->getSession()); + nonstd::expected<core::RecordSet, std::error_code> read_result; + process_session->getSession().read(flow_file, [&record_set_reader, &read_result](const std::shared_ptr<io::InputStream>& input_stream) { + read_result = record_set_reader->read(*input_stream); + return gsl::narrow<int64_t>(input_stream->size()); + }); + if (!read_result) { std::string error_message = "failed to read record set with the following error: " + read_result.error().message(); PyErr_SetString(PyExc_RuntimeError, error_message.c_str()); diff --git a/extensions/standard-processors/controllers/JsonTreeReader.cpp b/extensions/standard-processors/controllers/JsonTreeReader.cpp index 1031a5a62..40bfe9f7d 100644 --- a/extensions/standard-processors/controllers/JsonTreeReader.cpp +++ b/extensions/standard-processors/controllers/JsonTreeReader.cpp @@ -113,14 +113,14 @@ bool readAsArray(const std::string& content, core::RecordSet& record_set) { return true; } -nonstd::expected<core::RecordSet, std::error_code> JsonTreeReader::read(const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) { +nonstd::expected<core::RecordSet, std::error_code> JsonTreeReader::read(io::InputStream& input_stream) { core::RecordSet record_set{}; - const auto read_result = session.read(flow_file, [&record_set](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t { + const auto read_result = [&record_set](io::InputStream& input_stream) -> size_t { std::string content; - content.resize(input_stream->size()); - const auto read_ret = gsl::narrow<int64_t>(input_stream->read(as_writable_bytes(std::span(content)))); + content.resize(input_stream.size()); + const auto read_ret = input_stream.read(as_writable_bytes(std::span(content))); if (io::isError(read_ret)) { - return -1; + return io::STREAM_ERROR; } if (content.starts_with('[')) { readAsArray(content, record_set); @@ -128,7 +128,7 @@ nonstd::expected<core::RecordSet, std::error_code> JsonTreeReader::read(const st readAsJsonLines(content, record_set); } return read_ret; - }); + }(input_stream); if (io::isError(read_result)) return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument)); return record_set; diff --git a/extensions/standard-processors/controllers/JsonTreeReader.h b/extensions/standard-processors/controllers/JsonTreeReader.h index 58e8f1407..dc9e9540a 100644 --- a/extensions/standard-processors/controllers/JsonTreeReader.h +++ b/extensions/standard-processors/controllers/JsonTreeReader.h @@ -17,8 +17,6 @@ #pragma once #include "controllers/RecordSetReader.h" -#include "core/FlowFile.h" -#include "core/ProcessSession.h" namespace org::apache::nifi::minifi::standard { @@ -46,7 +44,7 @@ class JsonTreeReader final : public core::RecordSetReaderImpl { EXTENSIONAPI static constexpr auto ImplementsApis = std::array{ RecordSetReader::ProvidesApi }; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES - nonstd::expected<core::RecordSet, std::error_code> read(const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) override; + nonstd::expected<core::RecordSet, std::error_code> read(io::InputStream& input_stream) override; void initialize() override { setSupportedProperties(Properties); diff --git a/extensions/standard-processors/processors/SplitRecord.cpp b/extensions/standard-processors/processors/SplitRecord.cpp index 52016653b..5cc4f3153 100644 --- a/extensions/standard-processors/processors/SplitRecord.cpp +++ b/extensions/standard-processors/processors/SplitRecord.cpp @@ -49,7 +49,11 @@ void SplitRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& return; } - auto record_set = record_set_reader_->read(original_flow_file, session); + nonstd::expected<core::RecordSet, std::error_code> record_set; + session.read(original_flow_file, [this, &record_set](const std::shared_ptr<io::InputStream>& input_stream) { + record_set = record_set_reader_->read(*input_stream); + return gsl::narrow<int64_t>(input_stream->size()); + }); if (!record_set) { logger_->log_error("Failed to read record set from flow file: {}", record_set.error().message()); session.transfer(original_flow_file, Failure); diff --git a/libminifi/test/libtest/unit/RecordSetTesters.h b/libminifi/test/libtest/unit/RecordSetTesters.h index 56f64df4b..36bc18de9 100644 --- a/libminifi/test/libtest/unit/RecordSetTesters.h +++ b/libminifi/test/libtest/unit/RecordSetTesters.h @@ -62,15 +62,10 @@ bool testRecordWriter(RecordSetWriter& record_set_writer, const RecordSet& recor } inline bool testRecordReader(RecordSetReader& record_set_reader, const std::string_view serialized_record_set, const RecordSet& expected_record_set) { - const RecordSetFixture fixture; - ProcessSession& process_session = fixture.processSession(); - - const auto flow_file = process_session.create(); - process_session.writeBuffer(flow_file, serialized_record_set); - process_session.transfer(flow_file, fixture.getRelationship()); - process_session.commit(); + io::BufferStream buffer_stream; + buffer_stream.write(as_bytes(std::span(serialized_record_set))); - const auto record_set = record_set_reader.read(flow_file, process_session); + const auto record_set = record_set_reader.read(buffer_stream); if (!record_set) return false; diff --git a/minifi-api/include/minifi-cpp/controllers/RecordSetReader.h b/minifi-api/include/minifi-cpp/controllers/RecordSetReader.h index e5d207da2..b60563106 100644 --- a/minifi-api/include/minifi-cpp/controllers/RecordSetReader.h +++ b/minifi-api/include/minifi-cpp/controllers/RecordSetReader.h @@ -19,11 +19,10 @@ #include "core/controller/ControllerService.h" -#include "minifi-cpp/core/FlowFile.h" -#include "minifi-cpp/core/ProcessSession.h" #include "minifi-cpp/core/Record.h" #include "utils/Enum.h" #include "utils/ProcessorConfigUtils.h" +#include "minifi-cpp/io/InputStream.h" namespace org::apache::nifi::minifi::core { @@ -36,7 +35,7 @@ class RecordSetReader : public virtual controller::ControllerService { .type = "org.apache.nifi.minifi.core.RecordSetReader", }; - virtual nonstd::expected<RecordSet, std::error_code> read(const std::shared_ptr<FlowFile>& flow_file, ProcessSession& session) = 0; + virtual nonstd::expected<RecordSet, std::error_code> read(io::InputStream& input_stream) = 0; }; } // namespace org::apache::nifi::minifi::core
