martinzink commented on code in PR #2148:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2148#discussion_r3008232188
##########
minifi-api/common/include/minifi-cpp/io/StreamCallback.h:
##########
@@ -16,24 +16,101 @@
*/
#pragma once
+#include <cinttypes>
#include <functional>
#include <memory>
#include <optional>
+#include "../../minifi-api/include/minifi-c/minifi-c.h"
+#include "Stream.h"
+#include "minifi-cpp/utils/gsl.h"
+#include "utils/expected.h"
+
namespace org::apache::nifi::minifi::io {
class InputStream;
class OutputStream;
-struct ReadWriteResult {
- int64_t bytes_written = 0;
- int64_t bytes_read = 0;
+class IoResult {
+ public:
+ IoResult() = delete;
+ IoResult(const IoResult&) = default;
+ IoResult(IoResult&&) noexcept = default;
+ IoResult& operator=(IoResult&&) noexcept = default;
+ IoResult& operator=(const IoResult&) = default;
+
+ virtual ~IoResult() = default;
+
+ static IoResult error() { return
IoResult(nonstd::make_unexpected(MINIFI_IO_ERROR)); }
+ static IoResult cancelled() { return
IoResult(nonstd::make_unexpected(MINIFI_IO_CANCEL)); }
+ static IoResult zero() { return IoResult(0U); }
+
+ static IoResult fromI64(int64_t i64_val) {
+ if (i64_val < 0) { return
IoResult(nonstd::make_unexpected(static_cast<MinifiIoStatus>(i64_val))); }
+ return IoResult(gsl::narrow<uint64_t>(i64_val));
+ }
+
+ static IoResult fromSizeT(size_t val) {
+ if (isError(val)) { return IoResult::error(); }
+ return IoResult(gsl::narrow<uint64_t>(val));
+ }
+
+ [[nodiscard]] int64_t toI64() const {
+ if (result_.has_value()) { return gsl::narrow<int64_t>(*result_); }
+ return result_.error();
+ }
+
+ [[nodiscard]] bool is_cancelled() const { return !result_ && result_.error()
== MINIFI_IO_CANCEL; }
+
+ bool operator()() const { return result_.has_value(); }
+ bool operator!() const { return !result_.has_value(); }
+
+ uint64_t operator*() const { return *result_; }
+
+ nonstd::expected<uint64_t, MinifiIoStatus> inner() const { return result_; }
+
+ private:
+ explicit IoResult(nonstd::expected<uint64_t, MinifiIoStatus> result) :
result_(std::move(result)) {}
+
+ nonstd::expected<uint64_t, MinifiIoStatus> result_;
+};
+
+class ReadWriteResult {
+ public:
+ ReadWriteResult() = delete;
+ ReadWriteResult(const ReadWriteResult&) = default;
+ ReadWriteResult(ReadWriteResult&&) noexcept = default;
+ ReadWriteResult& operator=(ReadWriteResult&&) noexcept = default;
+ ReadWriteResult& operator=(const ReadWriteResult&) = default;
+
+ ReadWriteResult(const uint64_t bytes_read, const uint64_t bytes_written)
+ : result_(ReadWrite{.bytes_read = bytes_read, .bytes_written =
bytes_written}) {}
+
+ static ReadWriteResult zero() { return ReadWriteResult(ReadWrite{.bytes_read
= 0, .bytes_written = 0}); };
+ static ReadWriteResult error() { return
ReadWriteResult(nonstd::make_unexpected(MINIFI_IO_ERROR)); }
+ static ReadWriteResult cancelled() { return
ReadWriteResult(nonstd::make_unexpected(MINIFI_IO_CANCEL)); }
+
+ virtual ~ReadWriteResult() = default;
+
+ bool operator()() const { return result_.has_value(); }
+ bool operator!() const { return !result_.has_value(); }
+
+ uint64_t bytesWritten() const { return result_->bytes_written; }
+ uint64_t bytesRead() const { return result_->bytes_read; }
+
+ private:
+ struct ReadWrite {
+ uint64_t bytes_read;
+ uint64_t bytes_written;
+ };
+ explicit ReadWriteResult(nonstd::expected<ReadWrite, MinifiIoStatus> result)
: result_(std::move(result)) {}
+
+ nonstd::expected<ReadWrite, MinifiIoStatus> result_;
};
-// FlowFile IO Callback functions for input and output
-// throw exception for error
-using InputStreamCallback = std::function<int64_t(const
std::shared_ptr<InputStream>& input_stream)>;
-using OutputStreamCallback = std::function<int64_t(const
std::shared_ptr<OutputStream>& output_stream)>;
-using InputOutputStreamCallback =
std::function<std::optional<ReadWriteResult>(const
std::shared_ptr<InputStream>& input_stream, const
std::shared_ptr<OutputStream>& output_stream)>;
+using OutputStreamCallback = std::function<IoResult(const
std::shared_ptr<OutputStream>& output_stream)>;
+using InputStreamCallback = std::function<IoResult(const
std::shared_ptr<InputStream>& output_stream)>;
Review Comment:
https://github.com/apache/nifi-minifi-cpp/pull/2148/changes/d0cc52597cee192b10d9f46d08340029f6a8c6d5#diff-e3c3d53f326186152d21dc0bc5bb5e0339747cf2584945707e5d0141180c6595R112
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]