lordgamez commented on a change in pull request #868:
URL: https://github.com/apache/nifi-minifi-cpp/pull/868#discussion_r469808973
##########
File path: extensions/librdkafka/PublishKafka.cpp
##########
@@ -144,6 +159,305 @@ struct rd_kafka_conf_deleter {
struct rd_kafka_topic_conf_deleter {
void operator()(rd_kafka_topic_conf_t* p) const noexcept {
rd_kafka_topic_conf_destroy(p); }
};
+
+// Message
+enum class MessageStatus : uint8_t {
+ InFlight,
+ Error,
+ Success
+};
+
+const char* to_string(const MessageStatus s) {
+ switch (s) {
+ case MessageStatus::InFlight: return "InFlight";
+ case MessageStatus::Error: return "Error";
+ case MessageStatus::Success: return "Success";
+ }
+ throw std::runtime_error{"PublishKafka to_string(MessageStatus): unreachable
code"};
+}
+
+struct MessageResult {
+ MessageStatus status = MessageStatus::InFlight;
+ rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_NO_ERROR;
+};
+
+struct FlowFileResult {
+ bool flow_file_error = false;
+ std::vector<MessageResult> messages;
+};
+} // namespace
+
+class PublishKafka::Messages {
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ std::vector<FlowFileResult> flow_files_;
+ bool interrupted_ = false;
+ const std::shared_ptr<logging::Logger> logger_;
+
+ std::string logStatus(const std::unique_lock<std::mutex>& lock) const {
+ gsl_Expects(lock.owns_lock());
+ const auto messageresult_ok = [](const MessageResult r) { return r.status
== MessageStatus::Success && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+ const auto messageresult_inflight = [](const MessageResult r) { return
r.status == MessageStatus::InFlight && r.err_code ==
RD_KAFKA_RESP_ERR_NO_ERROR; };
+ std::vector<size_t> flow_files_in_flight;
+ std::ostringstream oss;
+ if (interrupted_) { oss << "interrupted, "; }
+ for (size_t ffi = 0; ffi < flow_files_.size(); ++ffi) {
+ const auto& flow_file = flow_files_[ffi];
+ if (!flow_file.flow_file_error &&
std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages),
messageresult_ok)) {
+ continue; // don't log the happy path to reduce log spam
+ }
+ if (!flow_file.flow_file_error &&
std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages),
messageresult_inflight)) {
+ flow_files_in_flight.push_back(ffi);
+ continue; // don't log fully in-flight flow files here, log them at
the end instead
+ }
+ oss << '[' << ffi << "]: {";
+ if (flow_file.flow_file_error) { oss << "error, "; }
+ for (size_t msgi = 0; msgi < flow_file.messages.size(); ++msgi) {
+ const auto& msg = flow_file.messages[msgi];
+ if (messageresult_ok(msg)) {
+ continue;
+ }
+ oss << '<' << msgi << ">: (msg " << to_string(msg.status) << ", " <<
rd_kafka_err2str(msg.err_code) << "), ";
+ }
+ oss << "}, ";
+ }
+ oss << "in-flight (" << flow_files_in_flight.size() << "): " <<
utils::StringUtils::join(",", flow_files_in_flight);
+ return oss.str();
+ }
+
+ public:
+ explicit Messages(std::shared_ptr<logging::Logger> logger)
:logger_{std::move(logger)} {}
+
+ void waitForCompletion() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ cv_.wait(lock, [this, &lock] {
+ if (logger_->should_log(logging::LOG_LEVEL::trace)) {
+ logger_->log_trace("%s", logStatus(lock));
+ }
+ return interrupted_ || std::all_of(std::begin(this->flow_files_),
std::end(this->flow_files_), [](const FlowFileResult& flow_file) {
+ return flow_file.flow_file_error ||
std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages),
[](const MessageResult& message) {
Review comment:
Maybe name these lambdas as well
##########
File path: extensions/librdkafka/PublishKafka.cpp
##########
@@ -144,6 +159,305 @@ struct rd_kafka_conf_deleter {
struct rd_kafka_topic_conf_deleter {
void operator()(rd_kafka_topic_conf_t* p) const noexcept {
rd_kafka_topic_conf_destroy(p); }
};
+
+// Message
+enum class MessageStatus : uint8_t {
+ InFlight,
+ Error,
+ Success
+};
+
+const char* to_string(const MessageStatus s) {
+ switch (s) {
+ case MessageStatus::InFlight: return "InFlight";
+ case MessageStatus::Error: return "Error";
+ case MessageStatus::Success: return "Success";
+ }
+ throw std::runtime_error{"PublishKafka to_string(MessageStatus): unreachable
code"};
+}
+
+struct MessageResult {
+ MessageStatus status = MessageStatus::InFlight;
+ rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_NO_ERROR;
+};
+
+struct FlowFileResult {
+ bool flow_file_error = false;
+ std::vector<MessageResult> messages;
+};
+} // namespace
+
+class PublishKafka::Messages {
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ std::vector<FlowFileResult> flow_files_;
+ bool interrupted_ = false;
+ const std::shared_ptr<logging::Logger> logger_;
+
+ std::string logStatus(const std::unique_lock<std::mutex>& lock) const {
+ gsl_Expects(lock.owns_lock());
+ const auto messageresult_ok = [](const MessageResult r) { return r.status
== MessageStatus::Success && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+ const auto messageresult_inflight = [](const MessageResult r) { return
r.status == MessageStatus::InFlight && r.err_code ==
RD_KAFKA_RESP_ERR_NO_ERROR; };
+ std::vector<size_t> flow_files_in_flight;
+ std::ostringstream oss;
+ if (interrupted_) { oss << "interrupted, "; }
+ for (size_t ffi = 0; ffi < flow_files_.size(); ++ffi) {
+ const auto& flow_file = flow_files_[ffi];
+ if (!flow_file.flow_file_error &&
std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages),
messageresult_ok)) {
Review comment:
This condition could be a lambda just to name it for better readability
##########
File path: extensions/librdkafka/PublishKafka.cpp
##########
@@ -144,6 +159,305 @@ struct rd_kafka_conf_deleter {
struct rd_kafka_topic_conf_deleter {
void operator()(rd_kafka_topic_conf_t* p) const noexcept {
rd_kafka_topic_conf_destroy(p); }
};
+
+// Message
+enum class MessageStatus : uint8_t {
+ InFlight,
+ Error,
+ Success
+};
+
+const char* to_string(const MessageStatus s) {
+ switch (s) {
+ case MessageStatus::InFlight: return "InFlight";
+ case MessageStatus::Error: return "Error";
+ case MessageStatus::Success: return "Success";
+ }
+ throw std::runtime_error{"PublishKafka to_string(MessageStatus): unreachable
code"};
+}
+
+struct MessageResult {
+ MessageStatus status = MessageStatus::InFlight;
+ rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_NO_ERROR;
+};
+
+struct FlowFileResult {
+ bool flow_file_error = false;
+ std::vector<MessageResult> messages;
+};
+} // namespace
+
+class PublishKafka::Messages {
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ std::vector<FlowFileResult> flow_files_;
+ bool interrupted_ = false;
+ const std::shared_ptr<logging::Logger> logger_;
+
+ std::string logStatus(const std::unique_lock<std::mutex>& lock) const {
+ gsl_Expects(lock.owns_lock());
+ const auto messageresult_ok = [](const MessageResult r) { return r.status
== MessageStatus::Success && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; };
+ const auto messageresult_inflight = [](const MessageResult r) { return
r.status == MessageStatus::InFlight && r.err_code ==
RD_KAFKA_RESP_ERR_NO_ERROR; };
+ std::vector<size_t> flow_files_in_flight;
+ std::ostringstream oss;
+ if (interrupted_) { oss << "interrupted, "; }
+ for (size_t ffi = 0; ffi < flow_files_.size(); ++ffi) {
+ const auto& flow_file = flow_files_[ffi];
+ if (!flow_file.flow_file_error &&
std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages),
messageresult_ok)) {
+ continue; // don't log the happy path to reduce log spam
+ }
+ if (!flow_file.flow_file_error &&
std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages),
messageresult_inflight)) {
Review comment:
This condition could be a lambda as well just to name it for better
readability
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]