This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit d8eeee0de7fa74adfefcdeeb58f532e788d9c157 Author: Marton Szasz <[email protected]> AuthorDate: Tue Aug 11 03:50:17 2020 +0200 MINIFICPP-1326 improve PublishKafka logging ..., move implementation details to the .cpp file, and other small refactors and improvements Signed-off-by: Arpad Boda <[email protected]> This closes #868 --- Windows.md | 1 + extensions/librdkafka/KafkaConnection.cpp | 2 +- extensions/librdkafka/KafkaConnection.h | 20 +- extensions/librdkafka/PublishKafka.cpp | 348 +++++++++++++++++++++++++++--- extensions/librdkafka/PublishKafka.h | 266 ++--------------------- win_build_vs.bat | 3 + 6 files changed, 352 insertions(+), 288 deletions(-) diff --git a/Windows.md b/Windows.md index d453955..c2400be 100644 --- a/Windows.md +++ b/Windows.md @@ -62,6 +62,7 @@ After the build directory it will take optional parameters modifying the CMake c | /J | Enables JNI | | /64 | Creates 64-bit build instead of a 32-bit one | | /D | Builds RelWithDebInfo build instead of Release | +| /DD | Builds Debug build instead of Release | | /CI | Sets STRICT_GSL_CHECKS to AUDIT | Examples: diff --git a/extensions/librdkafka/KafkaConnection.cpp b/extensions/librdkafka/KafkaConnection.cpp index cf871a8..3d412ad 100644 --- a/extensions/librdkafka/KafkaConnection.cpp +++ b/extensions/librdkafka/KafkaConnection.cpp @@ -87,7 +87,7 @@ std::shared_ptr<KafkaTopic> KafkaConnection::getTopic(const std::string &topic) return nullptr; } -KafkaConnectionKey const * const KafkaConnection::getKey() const { +KafkaConnectionKey const* KafkaConnection::getKey() const { return &key_; } diff --git a/extensions/librdkafka/KafkaConnection.h b/extensions/librdkafka/KafkaConnection.h index 816b40a..8d5b12e 100644 --- a/extensions/librdkafka/KafkaConnection.h +++ b/extensions/librdkafka/KafkaConnection.h @@ -37,14 +37,16 @@ namespace nifi { namespace minifi { namespace processors { -class KafkaConnectionKey { - public: - std::string client_id_; - std::string brokers_; - - bool operator <(const KafkaConnectionKey& rhs) const { - return std::tie(brokers_, client_id_) < std::tie(rhs.brokers_, rhs.client_id_); - } +struct KafkaConnectionKey { + std::string client_id_; + std::string brokers_; + + bool operator< (const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) < std::tie(rhs.brokers_, rhs.client_id_); } + bool operator<=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) <= std::tie(rhs.brokers_, rhs.client_id_); } + bool operator==(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) == std::tie(rhs.brokers_, rhs.client_id_); } + bool operator!=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) != std::tie(rhs.brokers_, rhs.client_id_); } + bool operator> (const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) > std::tie(rhs.brokers_, rhs.client_id_); } + bool operator>=(const KafkaConnectionKey& rhs) const { return std::tie(brokers_, client_id_) >= std::tie(rhs.brokers_, rhs.client_id_); } }; class KafkaConnection { @@ -70,7 +72,7 @@ class KafkaConnection { std::shared_ptr<KafkaTopic> getTopic(const std::string &topic) const; - KafkaConnectionKey const * const getKey() const; + KafkaConnectionKey const* getKey() const; void putTopic(const std::string &topicName, const std::shared_ptr<KafkaTopic> &topic); diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp index fc5ee56..1b92edc 100644 --- a/extensions/librdkafka/PublishKafka.cpp +++ b/extensions/librdkafka/PublishKafka.cpp @@ -39,6 +39,21 @@ namespace nifi { namespace minifi { namespace processors { +#define COMPRESSION_CODEC_NONE "none" +#define COMPRESSION_CODEC_GZIP "gzip" +#define COMPRESSION_CODEC_SNAPPY "snappy" +#define ROUND_ROBIN_PARTITIONING "Round Robin" +#define RANDOM_PARTITIONING "Random Robin" +#define USER_DEFINED_PARTITIONING "User-Defined" +#define DELIVERY_REPLICATED "all" +#define DELIVERY_ONE_NODE "1" +#define DELIVERY_BEST_EFFORT "0" +#define SECURITY_PROTOCOL_PLAINTEXT "plaintext" +#define SECURITY_PROTOCOL_SSL "ssl" +#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext" +#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl" +#define KAFKA_KEY_ATTRIBUTE "kafka.key" + const core::Property PublishKafka::SeedBrokers( core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>") ->isRequired(true)->supportsExpressionLanguage(true)->build()); @@ -144,6 +159,305 @@ struct rd_kafka_conf_deleter { struct rd_kafka_topic_conf_deleter { void operator()(rd_kafka_topic_conf_t* p) const noexcept { rd_kafka_topic_conf_destroy(p); } }; + +// Message +enum class MessageStatus : uint8_t { + InFlight, + Error, + Success +}; + +const char* to_string(const MessageStatus s) { + switch (s) { + case MessageStatus::InFlight: return "InFlight"; + case MessageStatus::Error: return "Error"; + case MessageStatus::Success: return "Success"; + } + throw std::runtime_error{"PublishKafka to_string(MessageStatus): unreachable code"}; +} + +struct MessageResult { + MessageStatus status = MessageStatus::InFlight; + rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_NO_ERROR; +}; + +struct FlowFileResult { + bool flow_file_error = false; + std::vector<MessageResult> messages; +}; +} // namespace + +class PublishKafka::Messages { + std::mutex mutex_; + std::condition_variable cv_; + std::vector<FlowFileResult> flow_files_; + bool interrupted_ = false; + const std::shared_ptr<logging::Logger> logger_; + + std::string logStatus(const std::unique_lock<std::mutex>& lock) const { + gsl_Expects(lock.owns_lock()); + const auto messageresult_ok = [](const MessageResult r) { return r.status == MessageStatus::Success && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; }; + const auto messageresult_inflight = [](const MessageResult r) { return r.status == MessageStatus::InFlight && r.err_code == RD_KAFKA_RESP_ERR_NO_ERROR; }; + std::vector<size_t> flow_files_in_flight; + std::ostringstream oss; + if (interrupted_) { oss << "interrupted, "; } + for (size_t ffi = 0; ffi < flow_files_.size(); ++ffi) { + const auto& flow_file = flow_files_[ffi]; + if (!flow_file.flow_file_error && std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), messageresult_ok)) { + continue; // don't log the happy path to reduce log spam + } + if (!flow_file.flow_file_error && std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), messageresult_inflight)) { + flow_files_in_flight.push_back(ffi); + continue; // don't log fully in-flight flow files here, log them at the end instead + } + oss << '[' << ffi << "]: {"; + if (flow_file.flow_file_error) { oss << "error, "; } + for (size_t msgi = 0; msgi < flow_file.messages.size(); ++msgi) { + const auto& msg = flow_file.messages[msgi]; + if (messageresult_ok(msg)) { + continue; + } + oss << '<' << msgi << ">: (msg " << to_string(msg.status) << ", " << rd_kafka_err2str(msg.err_code) << "), "; + } + oss << "}, "; + } + oss << "in-flight (" << flow_files_in_flight.size() << "): " << utils::StringUtils::join(",", flow_files_in_flight); + return oss.str(); + } + + public: + explicit Messages(std::shared_ptr<logging::Logger> logger) :logger_{std::move(logger)} {} + + void waitForCompletion() { + std::unique_lock<std::mutex> lock(mutex_); + cv_.wait(lock, [this, &lock] { + if (logger_->should_log(logging::LOG_LEVEL::trace)) { + logger_->log_trace("%s", logStatus(lock)); + } + return interrupted_ || std::all_of(std::begin(this->flow_files_), std::end(this->flow_files_), [](const FlowFileResult& flow_file) { + return flow_file.flow_file_error || std::all_of(std::begin(flow_file.messages), std::end(flow_file.messages), [](const MessageResult& message) { + return message.status != MessageStatus::InFlight; + }); + }); + }); + } + + template<typename Func> + auto modifyResult(size_t index, Func fun) -> decltype(fun(flow_files_.at(index))) { + std::unique_lock<std::mutex> lock(mutex_); + const auto notifier = gsl::finally([this]{ cv_.notify_all(); }); + try { + return fun(flow_files_.at(index)); + } catch(const std::exception& ex) { + logger_->log_warn("Messages::modifyResult exception: %s", ex.what()); + throw; + } + } + + size_t addFlowFile() { + std::lock_guard<std::mutex> lock(mutex_); + flow_files_.emplace_back(); + return flow_files_.size() - 1; + } + + template<typename Func> + auto iterateFlowFiles(Func fun) -> utils::void_t<decltype(fun(size_t{0}, flow_files_.front()))> { + std::lock_guard<std::mutex> lock(mutex_); + for (size_t index = 0U; index < flow_files_.size(); index++) { + fun(index, flow_files_[index]); + } + } + + void interrupt() { + std::unique_lock<std::mutex> lock(mutex_); + interrupted_ = true; + cv_.notify_all(); + gsl_Ensures(interrupted_); + } + + bool wasInterrupted() { + std::lock_guard<std::mutex> lock(mutex_); + return interrupted_; + } +}; + +namespace { +class ReadCallback : public InputStreamCallback { + public: + struct rd_kafka_headers_deleter { + void operator()(rd_kafka_headers_t* ptr) const noexcept { + rd_kafka_headers_destroy(ptr); + } + }; + + using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>; + + private: + void allocate_message_object(const size_t segment_num) const { + messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) { + // allocate message object to be filled in by the callback in produce() + if (flow_file.messages.size() < segment_num + 1) { + flow_file.messages.resize(segment_num + 1); + } + gsl_Ensures(flow_file.messages.size() > segment_num); + }); + } + + static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) { + const gsl::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) }; + if (!result) { throw std::bad_alloc{}; } + + for (const auto& kv : flow_file.getAttributes()) { + if (attribute_name_regex.match(kv.first)) { + rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size()); + } + } + return rd_kafka_headers_unique_ptr{ result }; + } + + rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const { + const std::shared_ptr<PublishKafka::Messages> messages_ptr_copy = this->messages_; + const auto flow_file_index_copy = this->flow_file_index_; + const auto logger = logger_; + const auto produce_callback = [messages_ptr_copy, flow_file_index_copy, segment_num, logger](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) { + messages_ptr_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage, logger, flow_file_index_copy](FlowFileResult &flow_file) { + auto &message = flow_file.messages.at(segment_num); + message.err_code = rkmessage->err; + message.status = message.err_code == 0 ? MessageStatus::Success : MessageStatus::Error; + if (message.err_code != RD_KAFKA_RESP_ERR_NO_ERROR) { + logger->log_warn("delivery callback, flow file #%zu/segment #%zu: %s", flow_file_index_copy, segment_num, rd_kafka_err2str(message.err_code)); + } else { + logger->log_debug("delivery callback, flow file #%zu/segment #%zu: success", flow_file_index_copy, segment_num); + } + }); + }; + // release()d below, deallocated in PublishKafka::messageDeliveryCallback + auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback)); + + allocate_message_object(segment_num); + + const gsl::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs.get()); + const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen), + RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END); + if (err == RD_KAFKA_RESP_ERR_NO_ERROR) { + // in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback + // in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it + (void)callback_ptr.release(); + } else { + // in case of failure, rd_kafka_producev doesn't take ownership of the headers, so we need to delete them + rd_kafka_headers_destroy(hdrs_copy); + } + logger_->log_trace("produce enqueued flow file #%zu/segment #%zu: %s", flow_file_index_, segment_num, rd_kafka_err2str(err)); + return err; + } + + public: + ReadCallback(const uint64_t max_seg_size, + std::string key, + rd_kafka_topic_t* const rkt, + rd_kafka_t* const rk, + const core::FlowFile& flowFile, + utils::Regex& attributeNameRegex, + std::shared_ptr<PublishKafka::Messages> messages, + const size_t flow_file_index, + const bool fail_empty_flow_files, + std::shared_ptr<logging::Logger> logger) + : flow_size_(flowFile.getSize()), + max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size), + key_(std::move(key)), + rkt_(rkt), + rk_(rk), + hdrs(make_headers(flowFile, attributeNameRegex)), + messages_(std::move(messages)), + flow_file_index_(flow_file_index), + fail_empty_flow_files_(fail_empty_flow_files), + logger_(std::move(logger)) + { } + + ReadCallback(const ReadCallback&) = delete; + ReadCallback& operator=(ReadCallback) = delete; + + int64_t process(const std::shared_ptr<io::BaseStream> stream) override { + std::vector<unsigned char> buffer; + + buffer.resize(max_seg_size_); + read_size_ = 0; + status_ = 0; + called_ = true; + + gsl_Expects(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0"); + // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases + const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_); + messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) { + flow_file.messages.reserve(reserved_msg_capacity); + }); + + // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_ + if (flow_size_ == 0 && !fail_empty_flow_files_) { + const auto err = produce(0, buffer, 0); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + status_ = -1; + error_ = rd_kafka_err2str(err); + } + return 0; + } + + for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) { + const int readRet = stream->read(buffer.data(), buffer.size()); + if (readRet < 0) { + status_ = -1; + error_ = "Failed to read from stream"; + return read_size_; + } + + if (readRet <= 0) { break; } + + const auto err = produce(segment_num, buffer, readRet); + if (err) { + messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) { + auto& message = flow_file.messages.at(segment_num); + message.status = MessageStatus::Error; + message.err_code = err; + }); + status_ = -1; + error_ = rd_kafka_err2str(err); + return read_size_; + } + read_size_ += readRet; + } + return read_size_; + } + + const uint64_t flow_size_ = 0; + const uint64_t max_seg_size_ = 0; + const std::string key_; + rd_kafka_topic_t* const rkt_ = nullptr; + rd_kafka_t* const rk_ = nullptr; + const rd_kafka_headers_unique_ptr hdrs; // not null + const std::shared_ptr<PublishKafka::Messages> messages_; + const size_t flow_file_index_; + int status_ = 0; + std::string error_; + int read_size_ = 0; + bool called_ = false; + const bool fail_empty_flow_files_ = true; + const std::shared_ptr<logging::Logger> logger_; +}; + +/** + * Message delivery report callback using the richer rd_kafka_message_t object. + */ +void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) { + if (rkmessage->_private == nullptr) { + return; + } + // allocated in ReadCallback::produce + auto* const func = reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private); + try { + (*func)(rk, rkmessage); + } catch (...) { } + delete func; +} } // namespace void PublishKafka::initialize() { @@ -235,20 +549,6 @@ void PublishKafka::notifyStop() { conn_.reset(); } -/** - * Message delivery report callback using the richer rd_kafka_message_t object. - */ -void PublishKafka::messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) { - if (rkmessage->_private == nullptr) { - return; - } - // allocated in PublishKafka::ReadCallback::produce - auto* func = reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private); - try { - (*func)(rk, rkmessage); - } catch (...) { } - delete func; -} bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessContext> &context) { std::string value; @@ -256,14 +556,14 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon std::string valueConf; std::array<char, 512U> errstr{}; rd_kafka_conf_res_t result; - const std::string PREFIX_ERROR_MSG = "PublishKafka: configure error result: "; + const char* const PREFIX_ERROR_MSG = "PublishKafka: configure error result: "; std::unique_ptr<rd_kafka_conf_t, rd_kafka_conf_deleter> conf_{ rd_kafka_conf_new() }; if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); } - auto key = conn_->getKey(); + const auto* const key = conn_->getKey(); if (key->brokers_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers"); @@ -451,7 +751,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon } // Set the delivery callback - rd_kafka_conf_set_dr_msg_cb(conf_.get(), &PublishKafka::messageDeliveryCallback); + rd_kafka_conf_set_dr_msg_cb(conf_.get(), &messageDeliveryCallback); // Set the logger callback rd_kafka_conf_set_log_cb(conf_.get(), &KafkaConnection::logCallback); @@ -579,13 +879,13 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex } logger_->log_debug("Processing %lu flow files with a total size of %llu B", flowFiles.size(), actual_bytes); - auto messages = std::make_shared<Messages>(); + auto messages = std::make_shared<Messages>(logger_); // We must add this to the messages set, so that it will be interrupted when notifyStop is called { std::lock_guard<std::mutex> lock(messages_mutex_); messages_set_.emplace(messages); } - // We also have to insure that it will be removed once we are done with it + // We also have to ensure that it will be removed once we are done with it const auto messagesSetGuard = gsl::finally([&]() { std::lock_guard<std::mutex> lock(messages_mutex_); messages_set_.erase(messages); @@ -636,8 +936,8 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex bool failEmptyFlowFiles = true; context->getProperty(FailEmptyFlowFiles.getName(), failEmptyFlowFiles); - PublishKafka::ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile, - attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles); + ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile, + attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles, logger_); session->read(flowFile, &callback); if (!callback.called_) { @@ -677,20 +977,20 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) { const auto& message = flow_file.messages[segment_num]; switch (message.status) { - case MessageStatus::MESSAGESTATUS_UNCOMPLETE: + case MessageStatus::InFlight: success = false; logger_->log_error("Waiting for delivery confirmation was interrupted for flow file %s segment %zu", flowFiles[index]->getUUIDStr(), segment_num); break; - case MessageStatus::MESSAGESTATUS_ERROR: + case MessageStatus::Error: success = false; logger_->log_error("Failed to deliver flow file %s segment %zu, error: %s", flowFiles[index]->getUUIDStr(), segment_num, rd_kafka_err2str(message.err_code)); break; - case MessageStatus::MESSAGESTATUS_SUCCESS: + case MessageStatus::Success: logger_->log_debug("Successfully delivered flow file %s segment %zu", flowFiles[index]->getUUIDStr(), segment_num); diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h index 5d16717..7cc9101 100644 --- a/extensions/librdkafka/PublishKafka.h +++ b/extensions/librdkafka/PublishKafka.h @@ -50,36 +50,9 @@ namespace nifi { namespace minifi { namespace processors { -#define COMPRESSION_CODEC_NONE "none" -#define COMPRESSION_CODEC_GZIP "gzip" -#define COMPRESSION_CODEC_SNAPPY "snappy" -#define ROUND_ROBIN_PARTITIONING "Round Robin" -#define RANDOM_PARTITIONING "Random Robin" -#define USER_DEFINED_PARTITIONING "User-Defined" -#define DELIVERY_REPLICATED "all" -#define DELIVERY_ONE_NODE "1" -#define DELIVERY_BEST_EFFORT "0" -#define SECURITY_PROTOCOL_PLAINTEXT "plaintext" -#define SECURITY_PROTOCOL_SSL "ssl" -#define SECURITY_PROTOCOL_SASL_PLAINTEXT "sasl_plaintext" -#define SECURITY_PROTOCOL_SASL_SSL "sasl_ssl" -#define KAFKA_KEY_ATTRIBUTE "kafka.key" - // PublishKafka Class class PublishKafka : public core::Processor { public: - // Constructor - /*! - * Create a new processor - */ - explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier()) - : core::Processor(std::move(name), uuid), - logger_(logging::LoggerFactory<PublishKafka>::getLogger()), - interrupted_(false) { - } - - virtual ~PublishKafka() = default; - static constexpr char const* ProcessorName = "PublishKafka"; // Supported Properties @@ -114,228 +87,13 @@ class PublishKafka : public core::Processor { static const core::Relationship Failure; static const core::Relationship Success; - // Message - enum class MessageStatus : uint8_t { - MESSAGESTATUS_UNCOMPLETE, - MESSAGESTATUS_ERROR, - MESSAGESTATUS_SUCCESS - }; - - struct MessageResult { - MessageStatus status = MessageStatus::MESSAGESTATUS_UNCOMPLETE; - rd_kafka_resp_err_t err_code = RD_KAFKA_RESP_ERR_UNKNOWN; - }; - - struct FlowFileResult { - bool flow_file_error = false; - std::vector<MessageResult> messages; - }; - - struct Messages { - std::mutex mutex; - std::condition_variable cv; - std::vector<FlowFileResult> flow_files; - bool interrupted = false; - - void waitForCompletion() { - std::unique_lock<std::mutex> lock(mutex); - cv.wait(lock, [this]() -> bool { - if (interrupted) { - return true; - } - size_t index = 0U; - return std::all_of(this->flow_files.begin(), this->flow_files.end(), [&](const FlowFileResult& flow_file) { - index++; - if (flow_file.flow_file_error) { - return true; - } - return std::all_of(flow_file.messages.begin(), flow_file.messages.end(), [](const MessageResult& message) { - return message.status != MessageStatus::MESSAGESTATUS_UNCOMPLETE; - }); - }); - }); - } - - void modifyResult(size_t index, const std::function<void(FlowFileResult&)>& fun) { - std::unique_lock<std::mutex> lock(mutex); - fun(flow_files.at(index)); - cv.notify_all(); - } - - size_t addFlowFile() { - std::lock_guard<std::mutex> lock(mutex); - flow_files.emplace_back(); - return flow_files.size() - 1; - } - - void iterateFlowFiles(const std::function<void(size_t /*index*/, const FlowFileResult& /*flow_file_result*/)>& fun) { - std::lock_guard<std::mutex> lock(mutex); - for (size_t index = 0U; index < flow_files.size(); index++) { - fun(index, flow_files[index]); - } - } - - void interrupt() { - std::unique_lock<std::mutex> lock(mutex); - interrupted = true; - cv.notify_all(); - } - - bool wasInterrupted() { - std::lock_guard<std::mutex> lock(mutex); - return interrupted; - } - }; - - // Nest Callback Class for read stream - class ReadCallback : public InputStreamCallback { - public: - struct rd_kafka_headers_deleter { - void operator()(rd_kafka_headers_t* ptr) const noexcept { - rd_kafka_headers_destroy(ptr); - } - }; - - using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>; - - private: - void allocate_message_object(const size_t segment_num) const { - messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) { - // allocate message object to be filled in by the callback in produce() - if (flow_file.messages.size() < segment_num + 1) { - flow_file.messages.resize(segment_num + 1); - } - }); - } - - static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) { - const gsl::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) }; - if (!result) { throw std::bad_alloc{}; } - - for (const auto& kv : flow_file.getAttributes()) { - if (attribute_name_regex.match(kv.first)) { - rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), kv.second.c_str(), kv.second.size()); - } - } - return rd_kafka_headers_unique_ptr{ result }; - } - - rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned char>& buffer, const size_t buflen) const { - const std::shared_ptr<Messages> messages_ptr_copy = this->messages_; - const auto flow_file_index_copy = this->flow_file_index_; - const auto produce_callback = [messages_ptr_copy, flow_file_index_copy, segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) { - messages_ptr_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult &flow_file) { - auto &message = flow_file.messages.at(segment_num); - message.err_code = rkmessage->err; - message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR; - }); - }; - // release()d below, deallocated in PublishKafka::messageDeliveryCallback - auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>(std::move(produce_callback)); - - allocate_message_object(segment_num); - - const gsl::owner<rd_kafka_headers_t*> hdrs_copy = rd_kafka_headers_copy(hdrs.get()); - const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), buflen), - RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback_ptr.get()), RD_KAFKA_V_END); - if (err == RD_KAFKA_RESP_ERR_NO_ERROR) { - // in case of failure, messageDeliveryCallback is not called and callback_ptr will delete the callback - // in case of success, messageDeliveryCallback takes ownership of the callback, so we no longer need to delete it - (void)callback_ptr.release(); - } else { - // in case of failure, rd_kafka_producev doesn't take ownership of the headers, so we need to delete them - rd_kafka_headers_destroy(hdrs_copy); - } - return err; - } - - public: - ReadCallback(const uint64_t max_seg_size, - std::string key, - rd_kafka_topic_t * const rkt, - rd_kafka_t * const rk, - const core::FlowFile& flowFile, - utils::Regex &attributeNameRegex, - std::shared_ptr<Messages> messages, - const size_t flow_file_index, - const bool fail_empty_flow_files) - : flow_size_(flowFile.getSize()), - max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size), - key_(std::move(key)), - rkt_(rkt), - rk_(rk), - hdrs(make_headers(flowFile, attributeNameRegex)), - messages_(std::move(messages)), - flow_file_index_(flow_file_index), - fail_empty_flow_files_(fail_empty_flow_files) - { } - - int64_t process(const std::shared_ptr<io::BaseStream> stream) { - std::vector<unsigned char> buffer; - - buffer.resize(max_seg_size_); - read_size_ = 0; - status_ = 0; - called_ = true; - - assert(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0"); - // ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases - const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : utils::intdiv_ceil(flow_size_, max_seg_size_); - messages_->modifyResult(flow_file_index_, [reserved_msg_capacity](FlowFileResult& flow_file) { - flow_file.messages.reserve(reserved_msg_capacity); - }); - - // If the flow file is empty, we still want to send the message, unless the user wants to fail_empty_flow_files_ - if (flow_size_ == 0 && !fail_empty_flow_files_) { - produce(0, buffer, 0); - return 0; - } - - for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) { - const int readRet = stream->read(buffer.data(), buffer.size()); - if (readRet < 0) { - status_ = -1; - error_ = "Failed to read from stream"; - return read_size_; - } - - if (readRet <= 0) { break; } + explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier()) + : core::Processor(std::move(name), uuid) { + } - const auto err = produce(segment_num, buffer, readRet); - if (err) { - messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) { - auto& message = flow_file.messages.at(segment_num); - message.status = MessageStatus::MESSAGESTATUS_ERROR; - message.err_code = err; - }); - status_ = -1; - error_ = rd_kafka_err2str(err); - return read_size_; - } - read_size_ += readRet; - } - return read_size_; - } + ~PublishKafka() override = default; - const uint64_t flow_size_ = 0; - const uint64_t max_seg_size_ = 0; - const std::string key_; - rd_kafka_topic_t * const rkt_ = nullptr; - rd_kafka_t * const rk_ = nullptr; - const rd_kafka_headers_unique_ptr hdrs; // not null - const std::shared_ptr<Messages> messages_; - const size_t flow_file_index_; - int status_ = 0; - std::string error_; - int read_size_ = 0; - bool called_ = false; - const bool fail_empty_flow_files_ = true; - }; - - public: - bool supportsDynamicProperties() override { - return true; - } + bool supportsDynamicProperties() override { return true; } /** * Function that's executed when the processor is scheduled. @@ -348,25 +106,25 @@ class PublishKafka : public core::Processor { void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; void notifyStop() override; + class Messages; + protected: bool configureNewConnection(const std::shared_ptr<core::ProcessContext> &context); bool createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name); private: - static void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque); - - std::shared_ptr<logging::Logger> logger_; + std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<PublishKafka>::getLogger()}; KafkaConnectionKey key_; std::unique_ptr<KafkaConnection> conn_; std::mutex connection_mutex_; - uint32_t batch_size_; - uint64_t target_batch_payload_size_; - uint64_t max_flow_seg_size_; + uint32_t batch_size_{}; + uint64_t target_batch_payload_size_{}; + uint64_t max_flow_seg_size_{}; utils::Regex attributeNameRegex_; - std::atomic<bool> interrupted_; + std::atomic<bool> interrupted_{false}; std::mutex messages_mutex_; std::set<std::shared_ptr<Messages>> messages_set_; }; diff --git a/win_build_vs.bat b/win_build_vs.bat index a2beefa..3a24778 100644 --- a/win_build_vs.bat +++ b/win_build_vs.bat @@ -64,6 +64,9 @@ for %%x in (%*) do ( if [%%~x] EQU [/D] ( set cmake_build_type=RelWithDebInfo ) + if [%%~x] EQU [/DD] ( + set cmake_build_type=Debug + ) if [%%~x] EQU [/CI] ( set "strict_gsl_checks=-DSTRICT_GSL_CHECKS=AUDIT" )
