fgerlits commented on a change in pull request #1236:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1236#discussion_r808924668
##########
File path: extensions/opencv/MotionDetector.cpp
##########
@@ -161,8 +160,16 @@ void MotionDetector::onTrigger(const
std::shared_ptr<core::ProcessContext> &cont
}
cv::Mat frame;
- opencv::FrameReadCallback cb(frame);
- session->read(flow_file, &cb);
+ session->read(flow_file, [&frame](const std::shared_ptr<io::BaseStream>&
inputStream) -> int64_t {
Review comment:
nitpicking, but the parameter should be `input_stream` (and also
`output_stream` below)
##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -177,41 +177,15 @@ ReplaceText::Parameters ReplaceText::readParameters(const
std::shared_ptr<core::
namespace {
-struct ReadFlowFileIntoBuffer : public InputStreamCallback {
- std::vector<std::byte> buffer_;
-
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
- buffer_.resize(stream->size());
- size_t bytes_read = stream->read(buffer_);
- return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
- }
-};
-
-struct WriteBufferToFlowFile : public OutputStreamCallback {
- const std::vector<uint8_t>& buffer_;
-
- explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) :
buffer_(buffer) {}
-
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
- size_t bytes_written = stream->write(buffer_, buffer_.size());
- return io::isError(bytes_written) ? -1 :
gsl::narrow<int64_t>(bytes_written);
- }
-};
-
} // namespace
Review comment:
there is nothing left in this namespace block, it can be removed
##########
File path: extensions/opencv/MotionDetector.cpp
##########
@@ -161,8 +160,16 @@ void MotionDetector::onTrigger(const
std::shared_ptr<core::ProcessContext> &cont
}
cv::Mat frame;
- opencv::FrameReadCallback cb(frame);
- session->read(flow_file, &cb);
+ session->read(flow_file, [&frame](const std::shared_ptr<io::BaseStream>&
inputStream) -> int64_t {
+ std::vector<uchar> image_buf;
+ image_buf.resize(inputStream->size());
+ const auto ret =
inputStream->read(gsl::make_span(image_buf).as_span<std::byte>());
+ if (io::isError(ret) || static_cast<std::size_t>(ret) !=
inputStream->size()) {
Review comment:
this cast is no longer needed, as `BaseStream::read()` returns a
`size_t` since one of your earlier refactors
##########
File path: extensions/usb-camera/GetUSBCamera.cpp
##########
@@ -108,25 +108,20 @@ void GetUSBCamera::onFrame(uvc_frame_t *frame, void *ptr)
{
flow_file->getAttribute("filename", flow_file_name);
cb_data->logger->log_info("Created flow file: %s", flow_file_name);
- // Initialize callback according to output format
- std::shared_ptr<OutputStreamCallback> write_cb;
-
- if (cb_data->format == "PNG") {
- write_cb =
std::make_shared<GetUSBCamera::PNGWriteCallback>(cb_data->png_write_mtx,
-
cb_data->frame_buffer,
-
cb_data->device_width,
-
cb_data->device_height);
- } else if (cb_data->format == "RAW") {
- write_cb =
std::make_shared<GetUSBCamera::RawWriteCallback>(cb_data->frame_buffer);
- } else {
+ if (cb_data->format != "PNG" && cb_data->format != "RAW") {
cb_data->logger->log_warn("Invalid format specified (%s); defaulting to
PNG", cb_data->format);
- write_cb =
std::make_shared<GetUSBCamera::PNGWriteCallback>(cb_data->png_write_mtx,
-
cb_data->frame_buffer,
-
cb_data->device_width,
-
cb_data->device_height);
}
- session->write(flow_file, write_cb.get());
+ if (cb_data->format != "RAW") {
+ // PNG or invalid format defaulting to PNG
+ session->write(flow_file,
GetUSBCamera::PNGWriteCallback{cb_data->png_write_mtx,
+ cb_data->frame_buffer,
+ cb_data->device_width,
+ cb_data->device_height});
+ } else {
+ // RAW
+ session->writeBuffer(flow_file, gsl::make_span(static_cast<const
std::byte*>(cb_data->frame_buffer->data), cb_data->frame_buffer->data_bytes));
+ }
Review comment:
I agree the old version was confusing, but so is the new version, with
all the `!=`s and comments. How about this?
```c++
if (cb_data->format == "RAW") {
session->writeBuffer(flow_file, gsl::make_span(static_cast<const
std::byte*>(cb_data->frame_buffer->data), cb_data->frame_buffer->data_bytes));
} else {
if (cb_data->format != "PNG") {
cb_data->logger->log_warn("Invalid format specified (%s); defaulting
to PNG", cb_data->format);
}
session->write(flow_file, GetUSBCamera::PNGWriteCallback{
cb_data->png_write_mtx,
cb_data->frame_buffer,
cb_data->device_width,
cb_data->device_height});
}
```
##########
File path: libminifi/test/unit/ContentRepositoryDependentTests.h
##########
@@ -87,20 +76,20 @@ class Fixture {
core::ProcessSession &processSession() { return *process_session_; }
+
void transferAndCommit(const std::shared_ptr<core::FlowFile>& flow_file) {
process_session_->transfer(flow_file, Success);
process_session_->commit();
}
+
Review comment:
two new blank lines, probably added by accident
##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -232,12 +232,13 @@ void ProcessSession::transfer(const
std::shared_ptr<core::FlowFile> &flow, Relat
flow->setDeleted(false);
}
-void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow,
OutputStreamCallback *callback) {
+void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, const
io::OutputStreamCallback& callback) {
auto flow_file_equality_checker = [&flow](const auto& flow_file) { return
flow == flow_file; };
gsl_ExpectsAudit(_updatedFlowFiles.contains(flow->getUUID())
|| _addedFlowFiles.contains(flow->getUUID())
|| std::any_of(_clonedFlowFiles.begin(), _clonedFlowFiles.end(),
flow_file_equality_checker));
+
Review comment:
blank line probably added by accident
##########
File path: libminifi/include/utils/StringUtils.h
##########
@@ -402,6 +402,7 @@ class StringUtils {
return to_hex(gsl::make_span(str).as_span<const std::byte>(), uppercase);
}
+
Review comment:
blank line probably added by accident
##########
File path: extensions/sftp/processors/PutSFTP.cpp
##########
@@ -436,9 +416,16 @@ bool PutSFTP::processOne(const
std::shared_ptr<core::ProcessContext> &context, c
std::string final_target_path = utils::file::concat_path(remote_path,
resolved_filename, true /*force_posix*/);
logger_->log_debug("The target path is %s, final target path is %s",
target_path.c_str(), final_target_path.c_str());
- ReadCallback read_callback(target_path.c_str(), *client,
conflict_resolution_);
try {
- session->read(flow_file, &read_callback);
+ session->read(flow_file, [&client, &target_path, this](const
std::shared_ptr<io::BaseStream>& stream) {
+ if (!client->putFile(target_path,
+ *stream,
+ conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
+ stream->size() /*expected_size*/)) {
+ throw utils::SFTPException{client->getLastError()};
+ }
+ return stream->size();
Review comment:
you could add a `gsl::narrow` here, as you did elsewhere
##########
File path: extensions/standard-processors/processors/DefragmentText.cpp
##########
@@ -254,13 +228,14 @@ void DefragmentText::Buffer::append(core::ProcessSession*
session, const gsl::no
store(session, flow_file_to_append);
return;
}
- auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff,
InputStreamCallback* cb) {
+ auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, const
io::InputStreamCallback& cb) {
return session->read(ff, cb);
};
PayloadSerializer serializer(flowFileReader);
- AppendFlowFileToFlowFile append_flow_file_to_flow_file(flow_file_to_append,
serializer);
session->add(buffered_flow_file_);
- session->append(buffered_flow_file_, &append_flow_file_to_flow_file);
+ session->append(buffered_flow_file_, [&serializer,
&flow_file_to_append](const auto& outputStream) -> int64_t {
Review comment:
should be `output_stream`
##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -269,18 +270,16 @@ void ProcessSession::write(const
std::shared_ptr<core::FlowFile> &flow, OutputSt
}
void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>&
flow_file, gsl::span<const char> buffer) {
- struct BufferOutputStreamCallback : OutputStreamCallback {
- explicit BufferOutputStreamCallback(gsl::span<const char> buffer)
:buffer{buffer} {}
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) final {
- return stream->write(reinterpret_cast<const uint8_t*>(buffer.data()),
buffer.size());
- }
- gsl::span<const char> buffer;
- };
- BufferOutputStreamCallback cb{ buffer };
- write(flow_file, &cb);
+ write(flow_file, [buffer](const std::shared_ptr<io::BaseStream>&
outputStream) {
+ const auto write_status = outputStream->write(reinterpret_cast<const
uint8_t*>(buffer.data()), buffer.size());
+ return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
+ });
+}
+void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>&
flow_file, gsl::span<const std::byte> buffer) {
+ writeBuffer(flow_file, gsl::make_span(reinterpret_cast<const
char*>(buffer.data()), buffer.size()));
}
Review comment:
these two would be slightly simpler as
```c++
void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>&
flow_file, gsl::span<const char> buffer) {
writeBuffer(flow_file, buffer.as_span<const std::byte>());
}
```
and
```c++
void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>&
flow_file, gsl::span<const std::byte> buffer) {
write(flow_file, [buffer](const std::shared_ptr<io::BaseStream>&
outputStream) {
const auto write_status = outputStream->write(buffer);
return io::isError(write_status) ? -1 :
gsl::narrow<int64_t>(write_status);
});
}
```
##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -322,8 +321,17 @@ void ProcessSession::append(const
std::shared_ptr<core::FlowFile> &flow, OutputS
throw;
}
}
+void ProcessSession::appendBuffer(const std::shared_ptr<core::FlowFile>&
flow_file, gsl::span<const char> buffer) {
+ append(flow_file, [buffer](const std::shared_ptr<io::BaseStream>&
outputStream) {
+ const auto write_status = outputStream->write(reinterpret_cast<const
uint8_t*>(buffer.data()), buffer.size());
+ return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
+ });
+}
+void ProcessSession::appendBuffer(const std::shared_ptr<core::FlowFile>&
flow_file, gsl::span<const std::byte> buffer) {
+ appendBuffer(flow_file, gsl::make_span(reinterpret_cast<const
char*>(buffer.data()), buffer.size()));
+}
Review comment:
same comment as at `writeBuffer`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]