This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch MINIFICPP-2600
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 859cc6f12454c79bc2a95faa68d5e733e413d61e
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Aug 5 17:21:28 2025 +0200

    MINIFICPP-2600 Change RecordSetReader interface
    
    RecordSetReader interface expected a flow file as an input, but in some
    scenarios the records are read from other sources like a network
    interface. We change the interface to expect an InputStream that as
    a more generic solution.
---
 extensions/python/types/PyRecordSetReader.cpp                |  7 ++++++-
 .../standard-processors/controllers/JsonTreeReader.cpp       | 12 ++++++------
 extensions/standard-processors/controllers/JsonTreeReader.h  |  4 +---
 extensions/standard-processors/processors/SplitRecord.cpp    |  6 +++++-
 libminifi/test/libtest/unit/RecordSetTesters.h               | 11 +++--------
 minifi-api/include/minifi-cpp/controllers/RecordSetReader.h  |  5 ++---
 6 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/extensions/python/types/PyRecordSetReader.cpp 
b/extensions/python/types/PyRecordSetReader.cpp
index d36f6a11f..f168aa68a 100644
--- a/extensions/python/types/PyRecordSetReader.cpp
+++ b/extensions/python/types/PyRecordSetReader.cpp
@@ -86,7 +86,12 @@ PyObject* PyRecordSetReader::read(PyRecordSetReader* self, 
PyObject* args) {
     return nullptr;
   }
 
-  auto read_result = record_set_reader->read(flow_file, 
process_session->getSession());
+  nonstd::expected<core::RecordSet, std::error_code> read_result;
+  process_session->getSession().read(flow_file, [&record_set_reader, 
&read_result](const std::shared_ptr<io::InputStream>& input_stream) {
+    read_result = record_set_reader->read(*input_stream);
+    return gsl::narrow<int64_t>(input_stream->size());
+  });
+
   if  (!read_result) {
     std::string error_message = "failed to read record set with the following 
error: " + read_result.error().message();
     PyErr_SetString(PyExc_RuntimeError, error_message.c_str());
diff --git a/extensions/standard-processors/controllers/JsonTreeReader.cpp 
b/extensions/standard-processors/controllers/JsonTreeReader.cpp
index 1031a5a62..40bfe9f7d 100644
--- a/extensions/standard-processors/controllers/JsonTreeReader.cpp
+++ b/extensions/standard-processors/controllers/JsonTreeReader.cpp
@@ -113,14 +113,14 @@ bool readAsArray(const std::string& content, 
core::RecordSet& record_set) {
   return true;
 }
 
-nonstd::expected<core::RecordSet, std::error_code> JsonTreeReader::read(const 
std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) {
+nonstd::expected<core::RecordSet, std::error_code> 
JsonTreeReader::read(io::InputStream& input_stream) {
   core::RecordSet record_set{};
-  const auto read_result = session.read(flow_file, [&record_set](const 
std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
+  const auto read_result = [&record_set](io::InputStream& input_stream) -> 
size_t {
     std::string content;
-    content.resize(input_stream->size());
-    const auto read_ret = 
gsl::narrow<int64_t>(input_stream->read(as_writable_bytes(std::span(content))));
+    content.resize(input_stream.size());
+    const auto read_ret = 
input_stream.read(as_writable_bytes(std::span(content)));
     if (io::isError(read_ret)) {
-      return -1;
+      return io::STREAM_ERROR;
     }
     if (content.starts_with('[')) {
       readAsArray(content, record_set);
@@ -128,7 +128,7 @@ nonstd::expected<core::RecordSet, std::error_code> 
JsonTreeReader::read(const st
       readAsJsonLines(content, record_set);
     }
     return read_ret;
-  });
+  }(input_stream);
   if (io::isError(read_result))
     return 
nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument));
   return record_set;
diff --git a/extensions/standard-processors/controllers/JsonTreeReader.h 
b/extensions/standard-processors/controllers/JsonTreeReader.h
index 58e8f1407..dc9e9540a 100644
--- a/extensions/standard-processors/controllers/JsonTreeReader.h
+++ b/extensions/standard-processors/controllers/JsonTreeReader.h
@@ -17,8 +17,6 @@
 #pragma once
 
 #include "controllers/RecordSetReader.h"
-#include "core/FlowFile.h"
-#include "core/ProcessSession.h"
 
 namespace org::apache::nifi::minifi::standard {
 
@@ -46,7 +44,7 @@ class JsonTreeReader final : public core::RecordSetReaderImpl 
{
   EXTENSIONAPI static constexpr auto ImplementsApis = std::array{ 
RecordSetReader::ProvidesApi };
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
 
-  nonstd::expected<core::RecordSet, std::error_code> read(const 
std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) 
override;
+  nonstd::expected<core::RecordSet, std::error_code> read(io::InputStream& 
input_stream) override;
 
   void initialize() override {
     setSupportedProperties(Properties);
diff --git a/extensions/standard-processors/processors/SplitRecord.cpp 
b/extensions/standard-processors/processors/SplitRecord.cpp
index 52016653b..5cc4f3153 100644
--- a/extensions/standard-processors/processors/SplitRecord.cpp
+++ b/extensions/standard-processors/processors/SplitRecord.cpp
@@ -49,7 +49,11 @@ void SplitRecord::onTrigger(core::ProcessContext& context, 
core::ProcessSession&
     return;
   }
 
-  auto record_set = record_set_reader_->read(original_flow_file, session);
+  nonstd::expected<core::RecordSet, std::error_code> record_set;
+  session.read(original_flow_file, [this, &record_set](const 
std::shared_ptr<io::InputStream>& input_stream) {
+    record_set = record_set_reader_->read(*input_stream);
+    return gsl::narrow<int64_t>(input_stream->size());
+  });
   if (!record_set) {
     logger_->log_error("Failed to read record set from flow file: {}", 
record_set.error().message());
     session.transfer(original_flow_file, Failure);
diff --git a/libminifi/test/libtest/unit/RecordSetTesters.h 
b/libminifi/test/libtest/unit/RecordSetTesters.h
index 56f64df4b..36bc18de9 100644
--- a/libminifi/test/libtest/unit/RecordSetTesters.h
+++ b/libminifi/test/libtest/unit/RecordSetTesters.h
@@ -62,15 +62,10 @@ bool testRecordWriter(RecordSetWriter& record_set_writer, 
const RecordSet& recor
 }
 
 inline bool testRecordReader(RecordSetReader& record_set_reader, const 
std::string_view serialized_record_set, const RecordSet& expected_record_set) {
-  const RecordSetFixture fixture;
-  ProcessSession& process_session = fixture.processSession();
-
-  const auto flow_file = process_session.create();
-  process_session.writeBuffer(flow_file, serialized_record_set);
-  process_session.transfer(flow_file, fixture.getRelationship());
-  process_session.commit();
+  io::BufferStream buffer_stream;
+  buffer_stream.write(as_bytes(std::span(serialized_record_set)));
 
-  const auto record_set = record_set_reader.read(flow_file, process_session);
+  const auto record_set = record_set_reader.read(buffer_stream);
   if (!record_set)
     return false;
 
diff --git a/minifi-api/include/minifi-cpp/controllers/RecordSetReader.h 
b/minifi-api/include/minifi-cpp/controllers/RecordSetReader.h
index e5d207da2..b60563106 100644
--- a/minifi-api/include/minifi-cpp/controllers/RecordSetReader.h
+++ b/minifi-api/include/minifi-cpp/controllers/RecordSetReader.h
@@ -19,11 +19,10 @@
 
 #include "core/controller/ControllerService.h"
 
-#include "minifi-cpp/core/FlowFile.h"
-#include "minifi-cpp/core/ProcessSession.h"
 #include "minifi-cpp/core/Record.h"
 #include "utils/Enum.h"
 #include "utils/ProcessorConfigUtils.h"
+#include "minifi-cpp/io/InputStream.h"
 
 
 namespace org::apache::nifi::minifi::core {
@@ -36,7 +35,7 @@ class RecordSetReader : public virtual 
controller::ControllerService {
     .type = "org.apache.nifi.minifi.core.RecordSetReader",
   };
 
-  virtual nonstd::expected<RecordSet, std::error_code> read(const 
std::shared_ptr<FlowFile>& flow_file, ProcessSession& session) = 0;
+  virtual nonstd::expected<RecordSet, std::error_code> read(io::InputStream& 
input_stream) = 0;
 };
 
 }  // namespace org::apache::nifi::minifi::core

Reply via email to