lordgamez commented on a change in pull request #868:
URL: https://github.com/apache/nifi-minifi-cpp/pull/868#discussion_r469828169
##########
File path: extensions/librdkafka/PublishKafka.cpp
##########
@@ -144,6 +159,305 @@ struct rd_kafka_conf_deleter {
struct rd_kafka_topic_conf_deleter {
void operator()(rd_kafka_topic_conf_t* p) const noexcept {
rd_kafka_topic_conf_destroy(p); }
};
+
+// Message
+enum class MessageStatus : uint8_t {
+ InFlight,
+ Error,
+ Success
+};
+
+const char* to_string(const MessageStatus s) {
+ switch (s) {
+ case MessageStatus::InFlight: return "InFlight";
+ case MessageStatus::Error: return "Error";
+ case MessageStatus::Success: return "Success";
+ }
+ throw std::runtime_error{"PublishKafka to_string(MessageStatus): unreachable
code"};
+}
+
+struct MessageResult {
+ MessageStatus status = MessageStatus::InFlight;
+ rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_NO_ERROR;
+};
+
+struct FlowFileResult {
+ bool flow_file_error = false;
+ std::vector<MessageResult> messages;
+};
+} // namespace
+
+class PublishKafka::Messages {
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ std::vector<FlowFileResult> flow_files_;
+ bool interrupted_ = false;
+ const std::shared_ptr<logging::Logger> logger_;
+
+ std::string logStatus(const std::unique_lock<std::mutex>& lock) const {
+ gsl_Expects(lock.owns_lock());
+ const auto messageresult_ok = [](const MessageResult r) { return r.status
== MessageStatus::Success && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+ const auto messageresult_inflight = [](const MessageResult r) { return
r.status == MessageStatus::InFlight && r.err_code ==
RD_KAFKA_RESP_ERR_NO_ERROR; };
+ std::vector<size_t> flow_files_in_flight;
+ std::ostringstream oss;
+ if (interrupted_) { oss << "interrupted, "; }
+ for (size_t ffi = 0; ffi < flow_files_.size(); ++ffi) {
+ const auto& flow_file = flow_files_[ffi];
+ if (!flow_file.flow_file_error &&
std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages),
messageresult_ok)) {
Review comment:
It reads okay, this could be just a very minor improvement. It could be
parameterized to be used for the inflight check as well like this:
```suggestion
if (all_messages_are(messageresult_ok, flow_file))) {
```
It doesn't add too much so you can ignore it if you decide.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]